diff options
author | Alexander Blair <snipa@jagtech.io> | 2020-12-01 14:22:16 -0800 |
---|---|---|
committer | Alexander Blair <snipa@jagtech.io> | 2020-12-01 14:22:16 -0800 |
commit | f41dce49acc89bd8672f3737e8e67f0efc8aacbd (patch) | |
tree | 7ad78a68316fb1bc7b99bb186398863e96e57843 | |
parent | Merge pull request #6948 (diff) | |
parent | Fix tx flush callback queueing (diff) | |
download | monero-f41dce49acc89bd8672f3737e8e67f0efc8aacbd.tar.xz |
Merge pull request #6954
dff1d8067 Fix tx flush callback queueing (Lee Clagett)
-rw-r--r-- | src/cryptonote_protocol/levin_notify.cpp | 35 |
1 files changed, 12 insertions, 23 deletions
diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index 75ba68d3b..2d04dffb6 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -283,9 +283,10 @@ namespace levin strand(io_service), map(), channels(), - flush_time(std::chrono::steady_clock::time_point::max()), connection_count(0), + flush_callbacks(0), nzone(zone), + is_public(is_public), pad_txs(pad_txs), fluffing(false) { @@ -300,9 +301,10 @@ namespace levin boost::asio::io_service::strand strand; net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand` - std::chrono::steady_clock::time_point flush_time; //!< Next expected Dandelion++ fluff flush std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time + std::uint32_t flush_callbacks; //!< Number of active fluff flush callbacks queued const epee::net_utils::zone nzone; //!< Zone is public ipv4/ipv6 connections, or i2p or tor + const bool is_public; //!< Zone is public ipv4/ipv6 connections const bool pad_txs; //!< Pad txs to the next boundary for privacy bool fluffing; //!< Zone is in Dandelion++ fluff epoch }; @@ -348,7 +350,6 @@ namespace levin struct fluff_flush { std::shared_ptr<detail::zone> zone_; - std::chrono::steady_clock::time_point flush_time_; static void queue(std::shared_ptr<detail::zone> zone, const std::chrono::steady_clock::time_point flush_time) { @@ -356,28 +357,21 @@ namespace levin assert(zone->strand.running_in_this_thread()); detail::zone& this_zone = *zone; - this_zone.flush_time = flush_time; + ++this_zone.flush_callbacks; this_zone.flush_txs.expires_at(flush_time); - this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone), flush_time})); + this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone)})); } void operator()(const boost::system::error_code error) { - if (!zone_ || !zone_->p2p) + if (!zone_ || !zone_->flush_callbacks || --zone_->flush_callbacks || !zone_->p2p) return; assert(zone_->strand.running_in_this_thread()); const bool timer_error = bool(error); - if (timer_error) - { - if (error != boost::system::errc::operation_canceled) - throw boost::system::system_error{error, "fluff_flush timer failed"}; - - // new timer canceled this one set in future - if (zone_->flush_time < flush_time_) - return; - } + if (timer_error && error != boost::system::errc::operation_canceled) + throw boost::system::system_error{error, "fluff_flush timer failed"}; const auto now = std::chrono::steady_clock::now(); auto next_flush = std::chrono::steady_clock::time_point::max(); @@ -413,8 +407,6 @@ namespace levin if (next_flush != std::chrono::steady_clock::time_point::max()) fluff_flush::queue(std::move(zone_), next_flush); - else - zone_->flush_time = next_flush; // signal that no timer is set } }; @@ -449,13 +441,11 @@ namespace levin MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing"); - bool available = false; - zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush, &available] (detail::p2p_context& context) + zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context) { // When i2p/tor, only fluff to outbound connections if (source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) { - available = true; if (context.fluff_txs.empty()) context.flush_time = now + (context.m_is_income ? in_duration() : out_duration()); @@ -467,10 +457,9 @@ namespace levin return true; }); - if (!available) + if (next_flush == std::chrono::steady_clock::time_point::max()) MWARNING("Unable to send transaction(s), no available connections"); - - if (next_flush < zone->flush_time) + else if (!zone->flush_callbacks || next_flush < zone->flush_txs.expires_at()) fluff_flush::queue(std::move(zone), next_flush); } }; |