diff options
author | luigi1111 <luigi1111w@gmail.com> | 2022-08-07 21:32:45 -0500 |
---|---|---|
committer | luigi1111 <luigi1111w@gmail.com> | 2022-08-07 21:32:45 -0500 |
commit | 90125931ade1b3fb16eae542d5c3087e4330bd2f (patch) | |
tree | ce08ab68e9425392e8ab869a6f696871beb4a89c /src | |
parent | Merge pull request #8435 (diff) | |
parent | Publish submitted txs via zmq (diff) | |
download | monero-90125931ade1b3fb16eae542d5c3087e4330bd2f.tar.xz |
Merge pull request #8451
8cc3c9a Publish submitted txs via zmq (j-berman)
Diffstat (limited to 'src')
-rw-r--r-- | src/cryptonote_core/cryptonote_core.cpp | 51 | ||||
-rw-r--r-- | src/cryptonote_core/cryptonote_core.h | 7 | ||||
-rw-r--r-- | src/cryptonote_core/tx_pool.cpp | 10 | ||||
-rw-r--r-- | src/cryptonote_core/tx_pool.h | 4 |
4 files changed, 67 insertions, 5 deletions
diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index a78f5d673..95cd1c83b 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -1406,21 +1406,66 @@ namespace cryptonote return true; } //----------------------------------------------------------------------------------------------- + bool core::notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const + { + if (!m_zmq_pub) + return true; + + if (tx_blobs.size() != tx_hashes.size() || tx_blobs.size() != txs.size() || tx_blobs.size() != just_broadcasted.size()) + return false; + + /* Publish txs via ZMQ that are "just broadcasted" by the daemon. This is + done here in addition to `handle_incoming_txs` in order to guarantee txs + are pub'd via ZMQ when we know the daemon has/will broadcast to other + nodes & *after* the tx is visible in the pool. This should get called + when the user submits a tx to a daemon in the "fluff" epoch relaying txs + via a public network. */ + if (std::count(just_broadcasted.begin(), just_broadcasted.end(), true) == 0) + return true; + + std::vector<txpool_event> results{}; + results.resize(tx_blobs.size()); + for (std::size_t i = 0; i < results.size(); ++i) + { + results[i].tx = std::move(txs[i]); + results[i].hash = std::move(tx_hashes[i]); + results[i].blob_size = tx_blobs[i].size(); + results[i].weight = results[i].tx.pruned ? get_pruned_transaction_weight(results[i].tx) : get_transaction_weight(results[i].tx, results[i].blob_size); + results[i].res = just_broadcasted[i]; + } + + m_zmq_pub(std::move(results)); + + return true; + } + //----------------------------------------------------------------------------------------------- void core::on_transactions_relayed(const epee::span<const cryptonote::blobdata> tx_blobs, const relay_method tx_relay) { + // lock ensures duplicate txs aren't pub'd via zmq + CRITICAL_REGION_LOCAL(m_incoming_tx_lock); + std::vector<crypto::hash> tx_hashes{}; tx_hashes.resize(tx_blobs.size()); + std::vector<cryptonote::transaction> txs{}; + txs.resize(tx_blobs.size()); + for (std::size_t i = 0; i < tx_blobs.size(); ++i) { - cryptonote::transaction tx{}; - if (!parse_and_validate_tx_from_blob(tx_blobs[i], tx, tx_hashes[i])) + if (!parse_and_validate_tx_from_blob(tx_blobs[i], txs[i], tx_hashes[i])) { LOG_ERROR("Failed to parse relayed transaction"); return; } } - m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay); + + std::vector<bool> just_broadcasted{}; + just_broadcasted.reserve(tx_hashes.size()); + + m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay, just_broadcasted); + + if (m_zmq_pub && matches_category(tx_relay, relay_category::legacy)) + notify_txpool_event(tx_blobs, epee::to_span(tx_hashes), epee::to_span(txs), just_broadcasted); } //----------------------------------------------------------------------------------------------- bool core::get_block_template(block& b, const account_public_address& adr, difficulty_type& diffic, uint64_t& height, uint64_t& expected_reward, const blobdata& ex_nonce, uint64_t &seed_height, crypto::hash &seed_hash) diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index 0b36730b6..5f134a999 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -1036,6 +1036,13 @@ namespace cryptonote bool relay_txpool_transactions(); /** + * @brief sends notification of txpool events to subscribers + * + * @return true on success, false otherwise + */ + bool notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const; + + /** * @brief checks DNS versions * * @return true on success, false otherwise diff --git a/src/cryptonote_core/tx_pool.cpp b/src/cryptonote_core/tx_pool.cpp index a68da0e62..0c18b2a34 100644 --- a/src/cryptonote_core/tx_pool.cpp +++ b/src/cryptonote_core/tx_pool.cpp @@ -820,8 +820,10 @@ namespace cryptonote return true; } //--------------------------------------------------------------------------------- - void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method) + void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method, std::vector<bool> &just_broadcasted) { + just_broadcasted.clear(); + crypto::random_poisson_seconds embargo_duration{dandelionpp_embargo_average}; const auto now = std::chrono::system_clock::now(); uint64_t next_relay = uint64_t{std::numeric_limits<time_t>::max()}; @@ -831,12 +833,14 @@ namespace cryptonote LockedTXN lock(m_blockchain.get_db()); for (const auto& hash : hashes) { + bool was_just_broadcasted = false; try { txpool_tx_meta_t meta; if (m_blockchain.get_txpool_tx_meta(hash, meta)) { // txes can be received as "stem" or "fluff" in either order + const bool already_broadcasted = meta.matches(relay_category::broadcasted); meta.upgrade_relay_method(method); meta.relayed = true; @@ -849,6 +853,9 @@ namespace cryptonote meta.last_relayed_time = std::chrono::system_clock::to_time_t(now); m_blockchain.update_txpool_tx(hash, meta); + + // wait until db update succeeds to ensure tx is visible in the pool + was_just_broadcasted = !already_broadcasted && meta.matches(relay_category::broadcasted); } } catch (const std::exception &e) @@ -856,6 +863,7 @@ namespace cryptonote MERROR("Failed to update txpool transaction metadata: " << e.what()); // continue } + just_broadcasted.emplace_back(was_just_broadcasted); } lock.commit(); set_if_less(m_next_check, time_t(next_relay)); diff --git a/src/cryptonote_core/tx_pool.h b/src/cryptonote_core/tx_pool.h index 62bef6c06..65c39f87c 100644 --- a/src/cryptonote_core/tx_pool.h +++ b/src/cryptonote_core/tx_pool.h @@ -353,8 +353,10 @@ namespace cryptonote * * @param hashes list of tx hashes that are about to be relayed * @param tx_relay update how the tx left this node + * @param just_broadcasted true if a tx was just broadcasted + * */ - void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay); + void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay, std::vector<bool> &just_broadcasted); /** * @brief get the total number of transactions in the pool |