diff options
-rw-r--r-- | .github/workflows/build.yml | 2 | ||||
-rw-r--r-- | src/blockchain_db/blockchain_db.h | 16 | ||||
-rw-r--r-- | src/blockchain_db/lmdb/db_lmdb.cpp | 86 | ||||
-rw-r--r-- | src/cryptonote_core/cryptonote_core.cpp | 51 | ||||
-rw-r--r-- | src/cryptonote_core/cryptonote_core.h | 7 | ||||
-rw-r--r-- | src/cryptonote_core/tx_pool.cpp | 10 | ||||
-rw-r--r-- | src/cryptonote_core/tx_pool.h | 4 | ||||
-rw-r--r-- | tests/README.md | 2 | ||||
-rw-r--r-- | tests/functional_tests/CMakeLists.txt | 4 | ||||
-rwxr-xr-x | tests/functional_tests/functional_tests_rpc.py | 4 | ||||
-rwxr-xr-x | tests/functional_tests/txpool.py | 20 | ||||
-rw-r--r-- | utils/python-rpc/framework/zmq.py | 49 |
12 files changed, 197 insertions, 58 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 69040d0af..ccc4f56fc 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -151,7 +151,7 @@ jobs: - name: install monero dependencies run: ${{env.APT_INSTALL_LINUX}} - name: install Python dependencies - run: pip install requests psutil monotonic + run: pip install requests psutil monotonic zmq - name: tests env: CTEST_OUTPUT_ON_FAILURE: ON diff --git a/src/blockchain_db/blockchain_db.h b/src/blockchain_db/blockchain_db.h index 263948fa2..a163ef98c 100644 --- a/src/blockchain_db/blockchain_db.h +++ b/src/blockchain_db/blockchain_db.h @@ -1883,16 +1883,18 @@ public: } virtual ~db_txn_guard() { - if (active) - stop(); + stop(); } void stop() { - if (readonly) - db->block_rtxn_stop(); - else - db->block_wtxn_stop(); - active = false; + if (active) + { + if (readonly) + db->block_rtxn_stop(); + else + db->block_wtxn_stop(); + active = false; + } } void abort() { diff --git a/src/blockchain_db/lmdb/db_lmdb.cpp b/src/blockchain_db/lmdb/db_lmdb.cpp index db7fa6c7c..f80013d02 100644 --- a/src/blockchain_db/lmdb/db_lmdb.cpp +++ b/src/blockchain_db/lmdb/db_lmdb.cpp @@ -465,6 +465,32 @@ void mdb_txn_safe::increment_txns(int i) num_active_txns += i; } +#define TXN_PREFIX(flags); \ + mdb_txn_safe auto_txn; \ + mdb_txn_safe* txn_ptr = &auto_txn; \ + if (m_batch_active) \ + txn_ptr = m_write_txn; \ + else \ + { \ + if (auto mdb_res = lmdb_txn_begin(m_env, NULL, flags, auto_txn)) \ + throw0(DB_ERROR(lmdb_error(std::string("Failed to create a transaction for the db in ")+__FUNCTION__+": ", mdb_res).c_str())); \ + } \ + +#define TXN_PREFIX_RDONLY() \ + MDB_txn *m_txn; \ + mdb_txn_cursors *m_cursors; \ + mdb_txn_safe auto_txn; \ + bool my_rtxn = block_rtxn_start(&m_txn, &m_cursors); \ + if (my_rtxn) auto_txn.m_tinfo = m_tinfo.get(); \ + else auto_txn.uncheck() +#define TXN_POSTFIX_RDONLY() + +#define TXN_POSTFIX_SUCCESS() \ + do { \ + if (! m_batch_active) \ + auto_txn.commit(); \ + } while(0) + void lmdb_resized(MDB_env *env, int isactive) { mdb_txn_safe::prevent_new_txns(); @@ -713,21 +739,20 @@ uint64_t BlockchainLMDB::get_estimated_batch_size(uint64_t batch_num_blocks, uin } else { - MDB_txn *rtxn; - mdb_txn_cursors *rcurs; - bool my_rtxn = block_rtxn_start(&rtxn, &rcurs); - for (uint64_t block_num = block_start; block_num <= block_stop; ++block_num) { - // we have access to block weight, which will be greater or equal to block size, - // so use this as a proxy. If it's too much off, we might have to check actual size, - // which involves reading more data, so is not really wanted - size_t block_weight = get_block_weight(block_num); - total_block_size += block_weight; - // Track number of blocks being totalled here instead of assuming, in case - // some blocks were to be skipped for being outliers. - ++num_blocks_used; + TXN_PREFIX_RDONLY(); + for (uint64_t block_num = block_start; block_num <= block_stop; ++block_num) + { + // we have access to block weight, which will be greater or equal to block size, + // so use this as a proxy. If it's too much off, we might have to check actual size, + // which involves reading more data, so is not really wanted + size_t block_weight = get_block_weight(block_num); + total_block_size += block_weight; + // Track number of blocks being totalled here instead of assuming, in case + // some blocks were to be skipped for being outliers. + ++num_blocks_used; + } } - if (my_rtxn) block_rtxn_stop(); avg_block_size = total_block_size / (num_blocks_used ? num_blocks_used : 1); MDEBUG("average block size across recent " << num_blocks_used << " blocks: " << avg_block_size); } @@ -1678,32 +1703,6 @@ void BlockchainLMDB::unlock() check_open(); } -#define TXN_PREFIX(flags); \ - mdb_txn_safe auto_txn; \ - mdb_txn_safe* txn_ptr = &auto_txn; \ - if (m_batch_active) \ - txn_ptr = m_write_txn; \ - else \ - { \ - if (auto mdb_res = lmdb_txn_begin(m_env, NULL, flags, auto_txn)) \ - throw0(DB_ERROR(lmdb_error(std::string("Failed to create a transaction for the db in ")+__FUNCTION__+": ", mdb_res).c_str())); \ - } \ - -#define TXN_PREFIX_RDONLY() \ - MDB_txn *m_txn; \ - mdb_txn_cursors *m_cursors; \ - mdb_txn_safe auto_txn; \ - bool my_rtxn = block_rtxn_start(&m_txn, &m_cursors); \ - if (my_rtxn) auto_txn.m_tinfo = m_tinfo.get(); \ - else auto_txn.uncheck() -#define TXN_POSTFIX_RDONLY() - -#define TXN_POSTFIX_SUCCESS() \ - do { \ - if (! m_batch_active) \ - auto_txn.commit(); \ - } while(0) - // The below two macros are for DB access within block add/remove, whether // regular batch txn is in use or not. m_write_txn is used as a batch txn, even @@ -3923,13 +3922,20 @@ void BlockchainLMDB::block_rtxn_stop() const LOG_PRINT_L3("BlockchainLMDB::" << __func__); mdb_txn_reset(m_tinfo->m_ti_rtxn); memset(&m_tinfo->m_ti_rflags, 0, sizeof(m_tinfo->m_ti_rflags)); + /* cancel out the increment from rtxn_start */ + mdb_txn_safe::increment_txns(-1); } bool BlockchainLMDB::block_rtxn_start() const { MDB_txn *mtxn; mdb_txn_cursors *mcur; - return block_rtxn_start(&mtxn, &mcur); + /* auto_txn is only used for the create gate */ + mdb_txn_safe auto_txn; + bool ret = block_rtxn_start(&mtxn, &mcur); + if (ret) + auto_txn.increment_txns(1); /* remember there is an active readtxn */ + return ret; } void BlockchainLMDB::block_wtxn_start() diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index a78f5d673..95cd1c83b 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -1406,21 +1406,66 @@ namespace cryptonote return true; } //----------------------------------------------------------------------------------------------- + bool core::notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const + { + if (!m_zmq_pub) + return true; + + if (tx_blobs.size() != tx_hashes.size() || tx_blobs.size() != txs.size() || tx_blobs.size() != just_broadcasted.size()) + return false; + + /* Publish txs via ZMQ that are "just broadcasted" by the daemon. This is + done here in addition to `handle_incoming_txs` in order to guarantee txs + are pub'd via ZMQ when we know the daemon has/will broadcast to other + nodes & *after* the tx is visible in the pool. This should get called + when the user submits a tx to a daemon in the "fluff" epoch relaying txs + via a public network. */ + if (std::count(just_broadcasted.begin(), just_broadcasted.end(), true) == 0) + return true; + + std::vector<txpool_event> results{}; + results.resize(tx_blobs.size()); + for (std::size_t i = 0; i < results.size(); ++i) + { + results[i].tx = std::move(txs[i]); + results[i].hash = std::move(tx_hashes[i]); + results[i].blob_size = tx_blobs[i].size(); + results[i].weight = results[i].tx.pruned ? get_pruned_transaction_weight(results[i].tx) : get_transaction_weight(results[i].tx, results[i].blob_size); + results[i].res = just_broadcasted[i]; + } + + m_zmq_pub(std::move(results)); + + return true; + } + //----------------------------------------------------------------------------------------------- void core::on_transactions_relayed(const epee::span<const cryptonote::blobdata> tx_blobs, const relay_method tx_relay) { + // lock ensures duplicate txs aren't pub'd via zmq + CRITICAL_REGION_LOCAL(m_incoming_tx_lock); + std::vector<crypto::hash> tx_hashes{}; tx_hashes.resize(tx_blobs.size()); + std::vector<cryptonote::transaction> txs{}; + txs.resize(tx_blobs.size()); + for (std::size_t i = 0; i < tx_blobs.size(); ++i) { - cryptonote::transaction tx{}; - if (!parse_and_validate_tx_from_blob(tx_blobs[i], tx, tx_hashes[i])) + if (!parse_and_validate_tx_from_blob(tx_blobs[i], txs[i], tx_hashes[i])) { LOG_ERROR("Failed to parse relayed transaction"); return; } } - m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay); + + std::vector<bool> just_broadcasted{}; + just_broadcasted.reserve(tx_hashes.size()); + + m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay, just_broadcasted); + + if (m_zmq_pub && matches_category(tx_relay, relay_category::legacy)) + notify_txpool_event(tx_blobs, epee::to_span(tx_hashes), epee::to_span(txs), just_broadcasted); } //----------------------------------------------------------------------------------------------- bool core::get_block_template(block& b, const account_public_address& adr, difficulty_type& diffic, uint64_t& height, uint64_t& expected_reward, const blobdata& ex_nonce, uint64_t &seed_height, crypto::hash &seed_hash) diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index 0b36730b6..5f134a999 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -1036,6 +1036,13 @@ namespace cryptonote bool relay_txpool_transactions(); /** + * @brief sends notification of txpool events to subscribers + * + * @return true on success, false otherwise + */ + bool notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const; + + /** * @brief checks DNS versions * * @return true on success, false otherwise diff --git a/src/cryptonote_core/tx_pool.cpp b/src/cryptonote_core/tx_pool.cpp index a68da0e62..0c18b2a34 100644 --- a/src/cryptonote_core/tx_pool.cpp +++ b/src/cryptonote_core/tx_pool.cpp @@ -820,8 +820,10 @@ namespace cryptonote return true; } //--------------------------------------------------------------------------------- - void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method) + void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method, std::vector<bool> &just_broadcasted) { + just_broadcasted.clear(); + crypto::random_poisson_seconds embargo_duration{dandelionpp_embargo_average}; const auto now = std::chrono::system_clock::now(); uint64_t next_relay = uint64_t{std::numeric_limits<time_t>::max()}; @@ -831,12 +833,14 @@ namespace cryptonote LockedTXN lock(m_blockchain.get_db()); for (const auto& hash : hashes) { + bool was_just_broadcasted = false; try { txpool_tx_meta_t meta; if (m_blockchain.get_txpool_tx_meta(hash, meta)) { // txes can be received as "stem" or "fluff" in either order + const bool already_broadcasted = meta.matches(relay_category::broadcasted); meta.upgrade_relay_method(method); meta.relayed = true; @@ -849,6 +853,9 @@ namespace cryptonote meta.last_relayed_time = std::chrono::system_clock::to_time_t(now); m_blockchain.update_txpool_tx(hash, meta); + + // wait until db update succeeds to ensure tx is visible in the pool + was_just_broadcasted = !already_broadcasted && meta.matches(relay_category::broadcasted); } } catch (const std::exception &e) @@ -856,6 +863,7 @@ namespace cryptonote MERROR("Failed to update txpool transaction metadata: " << e.what()); // continue } + just_broadcasted.emplace_back(was_just_broadcasted); } lock.commit(); set_if_less(m_next_check, time_t(next_relay)); diff --git a/src/cryptonote_core/tx_pool.h b/src/cryptonote_core/tx_pool.h index 62bef6c06..65c39f87c 100644 --- a/src/cryptonote_core/tx_pool.h +++ b/src/cryptonote_core/tx_pool.h @@ -353,8 +353,10 @@ namespace cryptonote * * @param hashes list of tx hashes that are about to be relayed * @param tx_relay update how the tx left this node + * @param just_broadcasted true if a tx was just broadcasted + * */ - void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay); + void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay, std::vector<bool> &just_broadcasted); /** * @brief get the total number of transactions in the pool diff --git a/tests/README.md b/tests/README.md index 908482c99..c63294e9b 100644 --- a/tests/README.md +++ b/tests/README.md @@ -54,7 +54,7 @@ Functional tests are located under the `tests/functional_tests` directory. Building all the tests requires installing the following dependencies: ```bash -pip install requests psutil monotonic +pip install requests psutil monotonic zmq ``` First, run a regtest daemon in the offline mode and with a fixed difficulty: diff --git a/tests/functional_tests/CMakeLists.txt b/tests/functional_tests/CMakeLists.txt index 5511cab1c..f7747b515 100644 --- a/tests/functional_tests/CMakeLists.txt +++ b/tests/functional_tests/CMakeLists.txt @@ -67,7 +67,7 @@ target_link_libraries(make_test_signature monero_add_minimal_executable(cpu_power_test cpu_power_test.cpp) find_program(PYTHON3_FOUND python3 REQUIRED) -execute_process(COMMAND ${PYTHON3_FOUND} "-c" "import requests; import psutil; import monotonic; print('OK')" OUTPUT_VARIABLE REQUESTS_OUTPUT OUTPUT_STRIP_TRAILING_WHITESPACE) +execute_process(COMMAND ${PYTHON3_FOUND} "-c" "import requests; import psutil; import monotonic; import zmq; print('OK')" OUTPUT_VARIABLE REQUESTS_OUTPUT OUTPUT_STRIP_TRAILING_WHITESPACE) if (REQUESTS_OUTPUT STREQUAL "OK") add_test( NAME functional_tests_rpc @@ -76,6 +76,6 @@ if (REQUESTS_OUTPUT STREQUAL "OK") NAME check_missing_rpc_methods COMMAND ${PYTHON3_FOUND} "${CMAKE_CURRENT_SOURCE_DIR}/check_missing_rpc_methods.py" "${CMAKE_SOURCE_DIR}") else() - message(WARNING "functional_tests_rpc and check_missing_rpc_methods skipped, needs the 'requests', 'psutil' and 'monotonic' python modules") + message(WARNING "functional_tests_rpc and check_missing_rpc_methods skipped, needs the 'requests', 'psutil', 'monotonic', and 'zmq' python modules") set(CTEST_CUSTOM_TESTS_IGNORE ${CTEST_CUSTOM_TESTS_IGNORE} functional_tests_rpc check_missing_rpc_methods) endif() diff --git a/tests/functional_tests/functional_tests_rpc.py b/tests/functional_tests/functional_tests_rpc.py index 450552cf8..eb8d51f08 100755 --- a/tests/functional_tests/functional_tests_rpc.py +++ b/tests/functional_tests/functional_tests_rpc.py @@ -47,7 +47,7 @@ WALLET_DIRECTORY = builddir + "/functional-tests-directory" FUNCTIONAL_TESTS_DIRECTORY = builddir + "/tests/functional_tests" DIFFICULTY = 10 -monerod_base = [builddir + "/bin/monerod", "--regtest", "--fixed-difficulty", str(DIFFICULTY), "--no-igd", "--p2p-bind-port", "monerod_p2p_port", "--rpc-bind-port", "monerod_rpc_port", "--zmq-rpc-bind-port", "monerod_zmq_port", "--non-interactive", "--disable-dns-checkpoints", "--check-updates", "disabled", "--rpc-ssl", "disabled", "--data-dir", "monerod_data_dir", "--log-level", "1"] +monerod_base = [builddir + "/bin/monerod", "--regtest", "--fixed-difficulty", str(DIFFICULTY), "--no-igd", "--p2p-bind-port", "monerod_p2p_port", "--rpc-bind-port", "monerod_rpc_port", "--zmq-rpc-bind-port", "monerod_zmq_port", "--zmq-pub", "monerod_zmq_pub", "--non-interactive", "--disable-dns-checkpoints", "--check-updates", "disabled", "--rpc-ssl", "disabled", "--data-dir", "monerod_data_dir", "--log-level", "1"] monerod_extra = [ ["--offline"], ["--rpc-payment-address", "44SKxxLQw929wRF6BA9paQ1EWFshNnKhXM3qz6Mo3JGDE2YG3xyzVutMStEicxbQGRfrYvAAYxH6Fe8rnD56EaNwUiqhcwR", "--rpc-payment-difficulty", str(DIFFICULTY), "--rpc-payment-credits", "5000", "--offline"], @@ -69,7 +69,7 @@ outputs = [] ports = [] for i in range(N_MONERODS): - command_lines.append([str(18180+i) if x == "monerod_rpc_port" else str(18280+i) if x == "monerod_p2p_port" else str(18380+i) if x == "monerod_zmq_port" else builddir + "/functional-tests-directory/monerod" + str(i) if x == "monerod_data_dir" else x for x in monerod_base]) + command_lines.append([str(18180+i) if x == "monerod_rpc_port" else str(18280+i) if x == "monerod_p2p_port" else str(18380+i) if x == "monerod_zmq_port" else "tcp://127.0.0.1:" + str(18480+i) if x == "monerod_zmq_pub" else builddir + "/functional-tests-directory/monerod" + str(i) if x == "monerod_data_dir" else x for x in monerod_base]) if i < len(monerod_extra): command_lines[-1] += monerod_extra[i] outputs.append(open(FUNCTIONAL_TESTS_DIRECTORY + '/monerod' + str(i) + '.log', 'a+')) diff --git a/tests/functional_tests/txpool.py b/tests/functional_tests/txpool.py index e92b5a530..b7f55d04c 100755 --- a/tests/functional_tests/txpool.py +++ b/tests/functional_tests/txpool.py @@ -35,6 +35,7 @@ from __future__ import print_function from framework.daemon import Daemon from framework.wallet import Wallet +from framework.zmq import Zmq class TransferTest(): def run_test(self): @@ -105,6 +106,10 @@ class TransferTest(): def check_txpool(self): daemon = Daemon() wallet = Wallet() + zmq = Zmq() + + zmq_topic = "json-minimal-txpool_add" + zmq.sub(zmq_topic) res = daemon.get_info() height = res.height @@ -142,6 +147,21 @@ class TransferTest(): min_bytes = min(min_bytes, x.blob_size) max_bytes = max(max_bytes, x.blob_size) + print('Checking all txs received via zmq') + for i in range(len(txes.keys())): + zmq_event = zmq.recv(zmq_topic) + assert len(zmq_event) == 1 + + zmq_tx = zmq_event[0] + + x = [x for x in res.transactions if x.id_hash == zmq_tx["id"]] + assert len(x) == 1 + + x = x[0] + assert x.blob_size == zmq_tx["blob_size"] + assert x.weight == zmq_tx["weight"] + assert x.fee == zmq_tx["fee"] + res = daemon.get_transaction_pool_hashes() assert sorted(res.tx_hashes) == sorted(txes.keys()) diff --git a/utils/python-rpc/framework/zmq.py b/utils/python-rpc/framework/zmq.py new file mode 100644 index 000000000..91ab70756 --- /dev/null +++ b/utils/python-rpc/framework/zmq.py @@ -0,0 +1,49 @@ +# Copyright (c) 2018-2022, The Monero Project + +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, are +# permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this list of +# conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, this list +# of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors may be +# used to endorse or promote products derived from this software without specific +# prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Class to subscribe to and receive ZMQ events.""" + +import zmq +import json + +class Zmq(object): + + def __init__(self, protocol='tcp', host='127.0.0.1', port=0, idx=0): + self.host = host + self.port = port + self.socket = zmq.Context().socket(zmq.SUB) + self.socket.connect('{protocol}://{host}:{port}'.format(protocol=protocol, host=host, port=port if port else 18480+idx)) + + def sub(self, topic): + self.socket.setsockopt_string(zmq.SUBSCRIBE, topic) + + def recv(self, topic): + msg = self.socket.recv() + data = msg.decode().split(topic + ":")[1] + return json.loads(data) |