aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluigi1111 <luigi1111w@gmail.com>2020-11-04 10:38:08 -0600
committerluigi1111 <luigi1111w@gmail.com>2020-11-04 10:38:08 -0600
commitef64c4e22f0dc5f99d6fd6dd0bb105b942c889ad (patch)
tree241ce1fcf9afe6b93ec92605c5e5be67dbdbcc3d
parentMerge pull request #6966 (diff)
parentFix tx flush callback queueing (diff)
downloadmonero-ef64c4e22f0dc5f99d6fd6dd0bb105b942c889ad.tar.xz
Merge pull request #6967
fa63d4b Fix tx flush callback queueing (vtnerd)
-rw-r--r--src/cryptonote_protocol/levin_notify.cpp33
1 files changed, 10 insertions, 23 deletions
diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp
index dbd11e7d0..9dfefe957 100644
--- a/src/cryptonote_protocol/levin_notify.cpp
+++ b/src/cryptonote_protocol/levin_notify.cpp
@@ -241,8 +241,8 @@ namespace levin
strand(io_service),
map(),
channels(),
- flush_time(std::chrono::steady_clock::time_point::max()),
connection_count(0),
+ flush_callbacks(0),
is_public(is_public),
pad_txs(pad_txs),
fluffing(false)
@@ -258,8 +258,8 @@ namespace levin
boost::asio::io_service::strand strand;
net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems
std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand`
- std::chrono::steady_clock::time_point flush_time; //!< Next expected Dandelion++ fluff flush
std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time
+ std::uint32_t flush_callbacks; //!< Number of active fluff flush callbacks queued
const bool is_public; //!< Zone is public ipv4/ipv6 connections
const bool pad_txs; //!< Pad txs to the next boundary for privacy
bool fluffing; //!< Zone is in Dandelion++ fluff epoch
@@ -305,7 +305,6 @@ namespace levin
struct fluff_flush
{
std::shared_ptr<detail::zone> zone_;
- std::chrono::steady_clock::time_point flush_time_;
static void queue(std::shared_ptr<detail::zone> zone, const std::chrono::steady_clock::time_point flush_time)
{
@@ -313,28 +312,21 @@ namespace levin
assert(zone->strand.running_in_this_thread());
detail::zone& this_zone = *zone;
- this_zone.flush_time = flush_time;
+ ++this_zone.flush_callbacks;
this_zone.flush_txs.expires_at(flush_time);
- this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone), flush_time}));
+ this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone)}));
}
void operator()(const boost::system::error_code error)
{
- if (!zone_ || !zone_->p2p)
+ if (!zone_ || !zone_->flush_callbacks || --zone_->flush_callbacks || !zone_->p2p)
return;
assert(zone_->strand.running_in_this_thread());
const bool timer_error = bool(error);
- if (timer_error)
- {
- if (error != boost::system::errc::operation_canceled)
- throw boost::system::system_error{error, "fluff_flush timer failed"};
-
- // new timer canceled this one set in future
- if (zone_->flush_time < flush_time_)
- return;
- }
+ if (timer_error && error != boost::system::errc::operation_canceled)
+ throw boost::system::system_error{error, "fluff_flush timer failed"};
const auto now = std::chrono::steady_clock::now();
auto next_flush = std::chrono::steady_clock::time_point::max();
@@ -370,8 +362,6 @@ namespace levin
if (next_flush != std::chrono::steady_clock::time_point::max())
fluff_flush::queue(std::move(zone_), next_flush);
- else
- zone_->flush_time = next_flush; // signal that no timer is set
}
};
@@ -406,13 +396,11 @@ namespace levin
MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing");
- bool available = false;
- zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush, &available] (detail::p2p_context& context)
+ zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context)
{
// When i2p/tor, only fluff to outbound connections
if (source != context.m_connection_id && (zone->is_public || !context.m_is_income))
{
- available = true;
if (context.fluff_txs.empty())
context.flush_time = now + (context.m_is_income ? in_duration() : out_duration());
@@ -424,10 +412,9 @@ namespace levin
return true;
});
- if (!available)
+ if (next_flush == std::chrono::steady_clock::time_point::max())
MWARNING("Unable to send transaction(s), no available connections");
-
- if (next_flush < zone->flush_time)
+ else if (!zone->flush_callbacks || next_flush < zone->flush_txs.expires_at())
fluff_flush::queue(std::move(zone), next_flush);
}
};