aboutsummaryrefslogtreecommitdiff
path: root/src/cryptonote_protocol
diff options
context:
space:
mode:
authoranon <anon [at] nowhere>2021-09-20 20:58:24 +0000
committeranon <anon [at] nowhere>2021-09-20 20:58:24 +0000
commitd4c754923e7af4119c00fe9949c2defa4e6fd9bd (patch)
tree1a4f37574986f82498d183a9cf64e2d44e927d4d /src/cryptonote_protocol
parentnode_server: add race condition demo (diff)
downloadmonero-d4c754923e7af4119c00fe9949c2defa4e6fd9bd.tar.xz
node_server: fix race condition
Diffstat (limited to 'src/cryptonote_protocol')
-rw-r--r--src/cryptonote_protocol/levin_notify.cpp54
-rw-r--r--src/cryptonote_protocol/levin_notify.h3
2 files changed, 46 insertions, 11 deletions
diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp
index ab4eeeb82..8b36ca58c 100644
--- a/src/cryptonote_protocol/levin_notify.cpp
+++ b/src/cryptonote_protocol/levin_notify.cpp
@@ -298,6 +298,12 @@ namespace levin
boost::asio::steady_timer next_epoch;
boost::asio::steady_timer flush_txs;
boost::asio::io_service::strand strand;
+ struct context_t {
+ std::vector<cryptonote::blobdata> fluff_txs;
+ std::chrono::steady_clock::time_point flush_time;
+ bool m_is_income;
+ };
+ boost::unordered_map<boost::uuids::uuid, context_t> contexts;
net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems
std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand`
std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time
@@ -374,14 +380,16 @@ namespace levin
const auto now = std::chrono::steady_clock::now();
auto next_flush = std::chrono::steady_clock::time_point::max();
std::vector<std::pair<std::vector<blobdata>, boost::uuids::uuid>> connections{};
- zone_->p2p->foreach_connection([timer_error, now, &next_flush, &connections] (detail::p2p_context& context)
+ for (auto &e: zone_->contexts)
{
+ auto &id = e.first;
+ auto &context = e.second;
if (!context.fluff_txs.empty())
{
if (context.flush_time <= now || timer_error) // flush on canceled timer
{
context.flush_time = std::chrono::steady_clock::time_point::max();
- connections.emplace_back(std::move(context.fluff_txs), context.m_connection_id);
+ connections.emplace_back(std::move(context.fluff_txs), id);
context.fluff_txs.clear();
}
else // not flushing yet
@@ -389,8 +397,7 @@ namespace levin
}
else // nothing to flush
context.flush_time = std::chrono::steady_clock::time_point::max();
- return true;
- });
+ }
/* Always send with `fluff` flag, even over i2p/tor. The hidden service
will disable the forwarding delay and immediately fluff. The i2p/tor
@@ -438,22 +445,21 @@ namespace levin
MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing");
-
- zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context)
+ for (auto &e: zone->contexts)
{
+ auto &id = e.first;
+ auto &context = e.second;
// When i2p/tor, only fluff to outbound connections
- if (context.handshake_complete() && source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income))
+ if (source != id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income))
{
if (context.fluff_txs.empty())
context.flush_time = now + (context.m_is_income ? in_duration() : out_duration());
next_flush = std::min(next_flush, context.flush_time);
context.fluff_txs.reserve(context.fluff_txs.size() + txs.size());
- for (const blobdata& tx : txs)
- context.fluff_txs.push_back(tx); // must copy instead of move (multiple conns)
+ context.fluff_txs.insert(context.fluff_txs.end(), txs.begin(), txs.end());
}
- return true;
- });
+ }
if (next_flush == std::chrono::steady_clock::time_point::max())
MWARNING("Unable to send transaction(s), no available connections");
@@ -764,6 +770,32 @@ namespace levin
);
}
+ void notify::on_handshake_complete(const boost::uuids::uuid &id, bool is_income)
+ {
+ if (!zone_)
+ return;
+
+ auto& zone = zone_;
+ zone_->strand.dispatch([zone, id, is_income]{
+ zone->contexts[id] = {
+ .fluff_txs = {},
+ .flush_time = std::chrono::steady_clock::time_point::max(),
+ .m_is_income = is_income,
+ };
+ });
+ }
+
+ void notify::on_connection_close(const boost::uuids::uuid &id)
+ {
+ if (!zone_)
+ return;
+
+ auto& zone = zone_;
+ zone_->strand.dispatch([zone, id]{
+ zone->contexts.erase(id);
+ });
+ }
+
void notify::run_epoch()
{
if (!zone_)
diff --git a/src/cryptonote_protocol/levin_notify.h b/src/cryptonote_protocol/levin_notify.h
index abbf9d461..12704746a 100644
--- a/src/cryptonote_protocol/levin_notify.h
+++ b/src/cryptonote_protocol/levin_notify.h
@@ -101,6 +101,9 @@ namespace levin
//! Probe for new outbound connection - skips if not needed.
void new_out_connection();
+ void on_handshake_complete(const boost::uuids::uuid &id, bool is_income);
+ void on_connection_close(const boost::uuids::uuid &id);
+
//! Run the logic for the next epoch immediately. Only use in testing.
void run_epoch();