From 158c3ecff3a1e760b8d307c1ad29c029f669a265 Mon Sep 17 00:00:00 2001 From: moneromooo-monero Date: Mon, 10 Jul 2017 15:38:56 +0100 Subject: core: thread most of handle_incoming_tx --- src/cryptonote_core/cryptonote_core.cpp | 106 ++++++++++++++++++++++++-------- src/cryptonote_core/cryptonote_core.h | 23 +++++++ 2 files changed, 104 insertions(+), 25 deletions(-) (limited to 'src/cryptonote_core') diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index 4cfa52441..35b77832c 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -37,6 +37,7 @@ using namespace epee; #include "common/util.h" #include "common/updates.h" #include "common/download.h" +#include "common/task_region.h" #include "warnings.h" #include "crypto/crypto.h" #include "cryptonote_config.h" @@ -76,6 +77,7 @@ namespace cryptonote m_checkpoints_path(""), m_last_dns_checkpoints_update(0), m_last_json_checkpoints_update(0), + m_threadpool(tools::thread_group::optimal()), m_update_download(0) { m_checkpoints_updating.clear(); @@ -483,11 +485,9 @@ namespace cryptonote return false; } //----------------------------------------------------------------------------------------------- - bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay) + bool core::handle_incoming_tx_pre(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay) { tvc = boost::value_initialized(); - //want to process all transactions sequentially - CRITICAL_REGION_LOCAL(m_incoming_tx_lock); if(tx_blob.size() > get_max_tx_size()) { @@ -497,9 +497,8 @@ namespace cryptonote return false; } - crypto::hash tx_hash = null_hash; - crypto::hash tx_prefixt_hash = null_hash; - transaction tx; + tx_hash = null_hash; + tx_prefixt_hash = null_hash; if(!parse_tx_from_blob(tx, tx_hash, tx_prefixt_hash, tx_blob)) { @@ -509,15 +508,18 @@ namespace cryptonote } //std::cout << "!"<< tx.vin.size() << std::endl; + bad_semantics_txes_lock.lock(); for (int idx = 0; idx < 2; ++idx) { if (bad_semantics_txes[idx].find(tx_hash) != bad_semantics_txes[idx].end()) { + bad_semantics_txes_lock.unlock(); LOG_PRINT_L1("Transaction already seen with bad semantics, rejected"); tvc.m_verifivation_failed = true; return false; } } + bad_semantics_txes_lock.unlock(); uint8_t version = m_blockchain_storage.get_current_hard_fork_version(); const size_t max_tx_version = version == 1 ? 1 : 2; @@ -528,18 +530,11 @@ namespace cryptonote return false; } - if(m_mempool.have_tx(tx_hash)) - { - LOG_PRINT_L2("tx " << tx_hash << "already have transaction in tx_pool"); - return true; - } - - if(m_blockchain_storage.have_tx(tx_hash)) - { - LOG_PRINT_L2("tx " << tx_hash << " already have transaction in blockchain"); - return true; - } - + return true; + } + //----------------------------------------------------------------------------------------------- + bool core::handle_incoming_tx_post(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay) + { if(!check_tx_syntax(tx)) { LOG_PRINT_L1("WRONG TRANSACTION BLOB, Failed to check tx " << tx_hash << " syntax, rejected"); @@ -568,23 +563,84 @@ namespace cryptonote { LOG_PRINT_L1("WRONG TRANSACTION BLOB, Failed to check tx " << tx_hash << " semantic, rejected"); tvc.m_verifivation_failed = true; + bad_semantics_txes_lock.lock(); bad_semantics_txes[0].insert(tx_hash); if (bad_semantics_txes[0].size() >= BAD_SEMANTICS_TXES_MAX_SIZE) { std::swap(bad_semantics_txes[0], bad_semantics_txes[1]); bad_semantics_txes[0].clear(); } + bad_semantics_txes_lock.unlock(); return false; } - bool r = add_new_tx(tx, tx_hash, tx_prefixt_hash, tx_blob.size(), tvc, keeped_by_block, relayed, do_not_relay); - if(tvc.m_verifivation_failed) - {MERROR_VER("Transaction verification failed: " << tx_hash);} - else if(tvc.m_verifivation_impossible) - {MERROR_VER("Transaction verification impossible: " << tx_hash);} + return true; + } + //----------------------------------------------------------------------------------------------- + bool core::handle_incoming_txs(const std::list& tx_blobs, std::vector& tvc, bool keeped_by_block, bool relayed, bool do_not_relay) + { + struct result { bool res; cryptonote::transaction tx; crypto::hash hash; crypto::hash prefix_hash; bool in_txpool; bool in_blockchain; }; + std::vector results(tx_blobs.size()); + + tvc.resize(tx_blobs.size()); + tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) { + std::list::const_iterator it = tx_blobs.begin(); + for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { + region.run([&, i, it] { + results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); + }); + } + }); + tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) { + std::list::const_iterator it = tx_blobs.begin(); + for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { + if (!results[i].res) + continue; + if(m_mempool.have_tx(results[i].hash)) + { + LOG_PRINT_L2("tx " << results[i].hash << "already have transaction in tx_pool"); + } + else if(m_blockchain_storage.have_tx(results[i].hash)) + { + LOG_PRINT_L2("tx " << results[i].hash << " already have transaction in blockchain"); + } + else + { + region.run([&, i, it] { + results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); + }); + } + } + }); + + bool ok = true; + std::list::const_iterator it = tx_blobs.begin(); + for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { + if (!results[i].res) + { + ok = false; + continue; + } + + ok &= add_new_tx(results[i].tx, results[i].hash, results[i].prefix_hash, it->size(), tvc[i], keeped_by_block, relayed, do_not_relay); + if(tvc[i].m_verifivation_failed) + {MERROR_VER("Transaction verification failed: " << results[i].hash);} + else if(tvc[i].m_verifivation_impossible) + {MERROR_VER("Transaction verification impossible: " << results[i].hash);} - if(tvc.m_added_to_pool) - MDEBUG("tx added: " << tx_hash); + if(tvc[i].m_added_to_pool) + MDEBUG("tx added: " << results[i].hash); + } + return ok; + } + //----------------------------------------------------------------------------------------------- + bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay) + { + std::list tx_blobs; + tx_blobs.push_back(tx_blob); + std::vector tvcv(1); + bool r = handle_incoming_txs(tx_blobs, tvcv, keeped_by_block, relayed, do_not_relay); + tvc = tvcv[0]; return r; } //----------------------------------------------------------------------------------------------- diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index e5fbf7f91..40ca9410b 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -40,6 +40,7 @@ #include "cryptonote_protocol/cryptonote_protocol_handler_common.h" #include "storages/portable_storage_template_helper.h" #include "common/download.h" +#include "common/thread_group.h" #include "tx_pool.h" #include "blockchain.h" #include "cryptonote_basic/miner.h" @@ -114,6 +115,22 @@ namespace cryptonote */ bool handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay); + /** + * @brief handles a list of incoming transactions + * + * Parses incoming transactions and, if nothing is obviously wrong, + * passes them along to the transaction pool + * + * @param tx_blobs the txs to handle + * @param tvc metadata about the transactions' validity + * @param keeped_by_block if the transactions have been in a block + * @param relayed whether or not the transactions were relayed to us + * @param do_not_relay whether to prevent the transactions from being relayed + * + * @return true if the transactions made it to the transaction pool, otherwise false + */ + bool handle_incoming_txs(const std::list& tx_blobs, std::vector& tvc, bool keeped_by_block, bool relayed, bool do_not_relay); + /** * @brief handles an incoming block * @@ -753,6 +770,9 @@ namespace cryptonote */ bool check_tx_semantic(const transaction& tx, bool keeped_by_block) const; + bool handle_incoming_tx_pre(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay); + bool handle_incoming_tx_post(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay); + /** * @copydoc miner::on_block_chain_update * @@ -859,6 +879,9 @@ namespace cryptonote time_t start_time; std::unordered_set bad_semantics_txes[2]; + boost::mutex bad_semantics_txes_lock; + + tools::thread_group m_threadpool; enum { UPDATES_DISABLED, -- cgit v1.2.3