aboutsummaryrefslogtreecommitdiff
path: root/src/cryptonote_core
diff options
context:
space:
mode:
authorj-berman <justinberman@protonmail.com>2022-07-09 15:08:06 -0700
committerj-berman <justinberman@protonmail.com>2022-07-21 11:53:31 -0700
commit8cc3c9af4dde7a32d675c4297d673dee4a9c2514 (patch)
treece08ab68e9425392e8ab869a6f696871beb4a89c /src/cryptonote_core
parentMerge pull request #8435 (diff)
downloadmonero-8cc3c9af4dde7a32d675c4297d673dee4a9c2514.tar.xz
Publish submitted txs via zmq
Diffstat (limited to 'src/cryptonote_core')
-rw-r--r--src/cryptonote_core/cryptonote_core.cpp51
-rw-r--r--src/cryptonote_core/cryptonote_core.h7
-rw-r--r--src/cryptonote_core/tx_pool.cpp10
-rw-r--r--src/cryptonote_core/tx_pool.h4
4 files changed, 67 insertions, 5 deletions
diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp
index a78f5d673..95cd1c83b 100644
--- a/src/cryptonote_core/cryptonote_core.cpp
+++ b/src/cryptonote_core/cryptonote_core.cpp
@@ -1406,21 +1406,66 @@ namespace cryptonote
return true;
}
//-----------------------------------------------------------------------------------------------
+ bool core::notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const
+ {
+ if (!m_zmq_pub)
+ return true;
+
+ if (tx_blobs.size() != tx_hashes.size() || tx_blobs.size() != txs.size() || tx_blobs.size() != just_broadcasted.size())
+ return false;
+
+ /* Publish txs via ZMQ that are "just broadcasted" by the daemon. This is
+ done here in addition to `handle_incoming_txs` in order to guarantee txs
+ are pub'd via ZMQ when we know the daemon has/will broadcast to other
+ nodes & *after* the tx is visible in the pool. This should get called
+ when the user submits a tx to a daemon in the "fluff" epoch relaying txs
+ via a public network. */
+ if (std::count(just_broadcasted.begin(), just_broadcasted.end(), true) == 0)
+ return true;
+
+ std::vector<txpool_event> results{};
+ results.resize(tx_blobs.size());
+ for (std::size_t i = 0; i < results.size(); ++i)
+ {
+ results[i].tx = std::move(txs[i]);
+ results[i].hash = std::move(tx_hashes[i]);
+ results[i].blob_size = tx_blobs[i].size();
+ results[i].weight = results[i].tx.pruned ? get_pruned_transaction_weight(results[i].tx) : get_transaction_weight(results[i].tx, results[i].blob_size);
+ results[i].res = just_broadcasted[i];
+ }
+
+ m_zmq_pub(std::move(results));
+
+ return true;
+ }
+ //-----------------------------------------------------------------------------------------------
void core::on_transactions_relayed(const epee::span<const cryptonote::blobdata> tx_blobs, const relay_method tx_relay)
{
+ // lock ensures duplicate txs aren't pub'd via zmq
+ CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
+
std::vector<crypto::hash> tx_hashes{};
tx_hashes.resize(tx_blobs.size());
+ std::vector<cryptonote::transaction> txs{};
+ txs.resize(tx_blobs.size());
+
for (std::size_t i = 0; i < tx_blobs.size(); ++i)
{
- cryptonote::transaction tx{};
- if (!parse_and_validate_tx_from_blob(tx_blobs[i], tx, tx_hashes[i]))
+ if (!parse_and_validate_tx_from_blob(tx_blobs[i], txs[i], tx_hashes[i]))
{
LOG_ERROR("Failed to parse relayed transaction");
return;
}
}
- m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay);
+
+ std::vector<bool> just_broadcasted{};
+ just_broadcasted.reserve(tx_hashes.size());
+
+ m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay, just_broadcasted);
+
+ if (m_zmq_pub && matches_category(tx_relay, relay_category::legacy))
+ notify_txpool_event(tx_blobs, epee::to_span(tx_hashes), epee::to_span(txs), just_broadcasted);
}
//-----------------------------------------------------------------------------------------------
bool core::get_block_template(block& b, const account_public_address& adr, difficulty_type& diffic, uint64_t& height, uint64_t& expected_reward, const blobdata& ex_nonce, uint64_t &seed_height, crypto::hash &seed_hash)
diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h
index 0b36730b6..5f134a999 100644
--- a/src/cryptonote_core/cryptonote_core.h
+++ b/src/cryptonote_core/cryptonote_core.h
@@ -1036,6 +1036,13 @@ namespace cryptonote
bool relay_txpool_transactions();
/**
+ * @brief sends notification of txpool events to subscribers
+ *
+ * @return true on success, false otherwise
+ */
+ bool notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const;
+
+ /**
* @brief checks DNS versions
*
* @return true on success, false otherwise
diff --git a/src/cryptonote_core/tx_pool.cpp b/src/cryptonote_core/tx_pool.cpp
index a68da0e62..0c18b2a34 100644
--- a/src/cryptonote_core/tx_pool.cpp
+++ b/src/cryptonote_core/tx_pool.cpp
@@ -820,8 +820,10 @@ namespace cryptonote
return true;
}
//---------------------------------------------------------------------------------
- void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method)
+ void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method, std::vector<bool> &just_broadcasted)
{
+ just_broadcasted.clear();
+
crypto::random_poisson_seconds embargo_duration{dandelionpp_embargo_average};
const auto now = std::chrono::system_clock::now();
uint64_t next_relay = uint64_t{std::numeric_limits<time_t>::max()};
@@ -831,12 +833,14 @@ namespace cryptonote
LockedTXN lock(m_blockchain.get_db());
for (const auto& hash : hashes)
{
+ bool was_just_broadcasted = false;
try
{
txpool_tx_meta_t meta;
if (m_blockchain.get_txpool_tx_meta(hash, meta))
{
// txes can be received as "stem" or "fluff" in either order
+ const bool already_broadcasted = meta.matches(relay_category::broadcasted);
meta.upgrade_relay_method(method);
meta.relayed = true;
@@ -849,6 +853,9 @@ namespace cryptonote
meta.last_relayed_time = std::chrono::system_clock::to_time_t(now);
m_blockchain.update_txpool_tx(hash, meta);
+
+ // wait until db update succeeds to ensure tx is visible in the pool
+ was_just_broadcasted = !already_broadcasted && meta.matches(relay_category::broadcasted);
}
}
catch (const std::exception &e)
@@ -856,6 +863,7 @@ namespace cryptonote
MERROR("Failed to update txpool transaction metadata: " << e.what());
// continue
}
+ just_broadcasted.emplace_back(was_just_broadcasted);
}
lock.commit();
set_if_less(m_next_check, time_t(next_relay));
diff --git a/src/cryptonote_core/tx_pool.h b/src/cryptonote_core/tx_pool.h
index 62bef6c06..65c39f87c 100644
--- a/src/cryptonote_core/tx_pool.h
+++ b/src/cryptonote_core/tx_pool.h
@@ -353,8 +353,10 @@ namespace cryptonote
*
* @param hashes list of tx hashes that are about to be relayed
* @param tx_relay update how the tx left this node
+ * @param just_broadcasted true if a tx was just broadcasted
+ *
*/
- void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay);
+ void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay, std::vector<bool> &just_broadcasted);
/**
* @brief get the total number of transactions in the pool