diff options
author | anon <anon [at] nowhere> | 2021-09-20 20:58:24 +0000 |
---|---|---|
committer | anon <anon [at] nowhere> | 2021-09-20 20:58:24 +0000 |
commit | d4c754923e7af4119c00fe9949c2defa4e6fd9bd (patch) | |
tree | 1a4f37574986f82498d183a9cf64e2d44e927d4d /src/cryptonote_protocol | |
parent | node_server: add race condition demo (diff) | |
download | monero-d4c754923e7af4119c00fe9949c2defa4e6fd9bd.tar.xz |
node_server: fix race condition
Diffstat (limited to 'src/cryptonote_protocol')
-rw-r--r-- | src/cryptonote_protocol/levin_notify.cpp | 54 | ||||
-rw-r--r-- | src/cryptonote_protocol/levin_notify.h | 3 |
2 files changed, 46 insertions, 11 deletions
diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index ab4eeeb82..8b36ca58c 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -298,6 +298,12 @@ namespace levin boost::asio::steady_timer next_epoch; boost::asio::steady_timer flush_txs; boost::asio::io_service::strand strand; + struct context_t { + std::vector<cryptonote::blobdata> fluff_txs; + std::chrono::steady_clock::time_point flush_time; + bool m_is_income; + }; + boost::unordered_map<boost::uuids::uuid, context_t> contexts; net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand` std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time @@ -374,14 +380,16 @@ namespace levin const auto now = std::chrono::steady_clock::now(); auto next_flush = std::chrono::steady_clock::time_point::max(); std::vector<std::pair<std::vector<blobdata>, boost::uuids::uuid>> connections{}; - zone_->p2p->foreach_connection([timer_error, now, &next_flush, &connections] (detail::p2p_context& context) + for (auto &e: zone_->contexts) { + auto &id = e.first; + auto &context = e.second; if (!context.fluff_txs.empty()) { if (context.flush_time <= now || timer_error) // flush on canceled timer { context.flush_time = std::chrono::steady_clock::time_point::max(); - connections.emplace_back(std::move(context.fluff_txs), context.m_connection_id); + connections.emplace_back(std::move(context.fluff_txs), id); context.fluff_txs.clear(); } else // not flushing yet @@ -389,8 +397,7 @@ namespace levin } else // nothing to flush context.flush_time = std::chrono::steady_clock::time_point::max(); - return true; - }); + } /* Always send with `fluff` flag, even over i2p/tor. The hidden service will disable the forwarding delay and immediately fluff. The i2p/tor @@ -438,22 +445,21 @@ namespace levin MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing"); - - zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context) + for (auto &e: zone->contexts) { + auto &id = e.first; + auto &context = e.second; // When i2p/tor, only fluff to outbound connections - if (context.handshake_complete() && source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) + if (source != id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) { if (context.fluff_txs.empty()) context.flush_time = now + (context.m_is_income ? in_duration() : out_duration()); next_flush = std::min(next_flush, context.flush_time); context.fluff_txs.reserve(context.fluff_txs.size() + txs.size()); - for (const blobdata& tx : txs) - context.fluff_txs.push_back(tx); // must copy instead of move (multiple conns) + context.fluff_txs.insert(context.fluff_txs.end(), txs.begin(), txs.end()); } - return true; - }); + } if (next_flush == std::chrono::steady_clock::time_point::max()) MWARNING("Unable to send transaction(s), no available connections"); @@ -764,6 +770,32 @@ namespace levin ); } + void notify::on_handshake_complete(const boost::uuids::uuid &id, bool is_income) + { + if (!zone_) + return; + + auto& zone = zone_; + zone_->strand.dispatch([zone, id, is_income]{ + zone->contexts[id] = { + .fluff_txs = {}, + .flush_time = std::chrono::steady_clock::time_point::max(), + .m_is_income = is_income, + }; + }); + } + + void notify::on_connection_close(const boost::uuids::uuid &id) + { + if (!zone_) + return; + + auto& zone = zone_; + zone_->strand.dispatch([zone, id]{ + zone->contexts.erase(id); + }); + } + void notify::run_epoch() { if (!zone_) diff --git a/src/cryptonote_protocol/levin_notify.h b/src/cryptonote_protocol/levin_notify.h index abbf9d461..12704746a 100644 --- a/src/cryptonote_protocol/levin_notify.h +++ b/src/cryptonote_protocol/levin_notify.h @@ -101,6 +101,9 @@ namespace levin //! Probe for new outbound connection - skips if not needed. void new_out_connection(); + void on_handshake_complete(const boost::uuids::uuid &id, bool is_income); + void on_connection_close(const boost::uuids::uuid &id); + //! Run the logic for the next epoch immediately. Only use in testing. void run_epoch(); |