aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authormoneromooo-monero <moneromooo-monero@users.noreply.github.com>2017-07-10 15:38:56 +0100
committermoneromooo-monero <moneromooo-monero@users.noreply.github.com>2017-08-07 09:33:20 +0100
commit158c3ecff3a1e760b8d307c1ad29c029f669a265 (patch)
treeb7c6ebace176a8b65fc1d24c51f4f9181da0d8b2 /src
parentcryptonote_protocol: retry stale spans early (diff)
downloadmonero-158c3ecff3a1e760b8d307c1ad29c029f669a265.tar.xz
core: thread most of handle_incoming_tx
Diffstat (limited to 'src')
-rw-r--r--src/cryptonote_core/cryptonote_core.cpp106
-rw-r--r--src/cryptonote_core/cryptonote_core.h23
-rw-r--r--src/cryptonote_protocol/cryptonote_protocol_handler.inl15
3 files changed, 113 insertions, 31 deletions
diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp
index 4cfa52441..35b77832c 100644
--- a/src/cryptonote_core/cryptonote_core.cpp
+++ b/src/cryptonote_core/cryptonote_core.cpp
@@ -37,6 +37,7 @@ using namespace epee;
#include "common/util.h"
#include "common/updates.h"
#include "common/download.h"
+#include "common/task_region.h"
#include "warnings.h"
#include "crypto/crypto.h"
#include "cryptonote_config.h"
@@ -76,6 +77,7 @@ namespace cryptonote
m_checkpoints_path(""),
m_last_dns_checkpoints_update(0),
m_last_json_checkpoints_update(0),
+ m_threadpool(tools::thread_group::optimal()),
m_update_download(0)
{
m_checkpoints_updating.clear();
@@ -483,11 +485,9 @@ namespace cryptonote
return false;
}
//-----------------------------------------------------------------------------------------------
- bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
+ bool core::handle_incoming_tx_pre(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay)
{
tvc = boost::value_initialized<tx_verification_context>();
- //want to process all transactions sequentially
- CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
if(tx_blob.size() > get_max_tx_size())
{
@@ -497,9 +497,8 @@ namespace cryptonote
return false;
}
- crypto::hash tx_hash = null_hash;
- crypto::hash tx_prefixt_hash = null_hash;
- transaction tx;
+ tx_hash = null_hash;
+ tx_prefixt_hash = null_hash;
if(!parse_tx_from_blob(tx, tx_hash, tx_prefixt_hash, tx_blob))
{
@@ -509,15 +508,18 @@ namespace cryptonote
}
//std::cout << "!"<< tx.vin.size() << std::endl;
+ bad_semantics_txes_lock.lock();
for (int idx = 0; idx < 2; ++idx)
{
if (bad_semantics_txes[idx].find(tx_hash) != bad_semantics_txes[idx].end())
{
+ bad_semantics_txes_lock.unlock();
LOG_PRINT_L1("Transaction already seen with bad semantics, rejected");
tvc.m_verifivation_failed = true;
return false;
}
}
+ bad_semantics_txes_lock.unlock();
uint8_t version = m_blockchain_storage.get_current_hard_fork_version();
const size_t max_tx_version = version == 1 ? 1 : 2;
@@ -528,18 +530,11 @@ namespace cryptonote
return false;
}
- if(m_mempool.have_tx(tx_hash))
- {
- LOG_PRINT_L2("tx " << tx_hash << "already have transaction in tx_pool");
- return true;
- }
-
- if(m_blockchain_storage.have_tx(tx_hash))
- {
- LOG_PRINT_L2("tx " << tx_hash << " already have transaction in blockchain");
- return true;
- }
-
+ return true;
+ }
+ //-----------------------------------------------------------------------------------------------
+ bool core::handle_incoming_tx_post(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay)
+ {
if(!check_tx_syntax(tx))
{
LOG_PRINT_L1("WRONG TRANSACTION BLOB, Failed to check tx " << tx_hash << " syntax, rejected");
@@ -568,23 +563,84 @@ namespace cryptonote
{
LOG_PRINT_L1("WRONG TRANSACTION BLOB, Failed to check tx " << tx_hash << " semantic, rejected");
tvc.m_verifivation_failed = true;
+ bad_semantics_txes_lock.lock();
bad_semantics_txes[0].insert(tx_hash);
if (bad_semantics_txes[0].size() >= BAD_SEMANTICS_TXES_MAX_SIZE)
{
std::swap(bad_semantics_txes[0], bad_semantics_txes[1]);
bad_semantics_txes[0].clear();
}
+ bad_semantics_txes_lock.unlock();
return false;
}
- bool r = add_new_tx(tx, tx_hash, tx_prefixt_hash, tx_blob.size(), tvc, keeped_by_block, relayed, do_not_relay);
- if(tvc.m_verifivation_failed)
- {MERROR_VER("Transaction verification failed: " << tx_hash);}
- else if(tvc.m_verifivation_impossible)
- {MERROR_VER("Transaction verification impossible: " << tx_hash);}
+ return true;
+ }
+ //-----------------------------------------------------------------------------------------------
+ bool core::handle_incoming_txs(const std::list<blobdata>& tx_blobs, std::vector<tx_verification_context>& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
+ {
+ struct result { bool res; cryptonote::transaction tx; crypto::hash hash; crypto::hash prefix_hash; bool in_txpool; bool in_blockchain; };
+ std::vector<result> results(tx_blobs.size());
+
+ tvc.resize(tx_blobs.size());
+ tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) {
+ std::list<blobdata>::const_iterator it = tx_blobs.begin();
+ for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
+ region.run([&, i, it] {
+ results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
+ });
+ }
+ });
+ tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) {
+ std::list<blobdata>::const_iterator it = tx_blobs.begin();
+ for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
+ if (!results[i].res)
+ continue;
+ if(m_mempool.have_tx(results[i].hash))
+ {
+ LOG_PRINT_L2("tx " << results[i].hash << "already have transaction in tx_pool");
+ }
+ else if(m_blockchain_storage.have_tx(results[i].hash))
+ {
+ LOG_PRINT_L2("tx " << results[i].hash << " already have transaction in blockchain");
+ }
+ else
+ {
+ region.run([&, i, it] {
+ results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
+ });
+ }
+ }
+ });
+
+ bool ok = true;
+ std::list<blobdata>::const_iterator it = tx_blobs.begin();
+ for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
+ if (!results[i].res)
+ {
+ ok = false;
+ continue;
+ }
+
+ ok &= add_new_tx(results[i].tx, results[i].hash, results[i].prefix_hash, it->size(), tvc[i], keeped_by_block, relayed, do_not_relay);
+ if(tvc[i].m_verifivation_failed)
+ {MERROR_VER("Transaction verification failed: " << results[i].hash);}
+ else if(tvc[i].m_verifivation_impossible)
+ {MERROR_VER("Transaction verification impossible: " << results[i].hash);}
- if(tvc.m_added_to_pool)
- MDEBUG("tx added: " << tx_hash);
+ if(tvc[i].m_added_to_pool)
+ MDEBUG("tx added: " << results[i].hash);
+ }
+ return ok;
+ }
+ //-----------------------------------------------------------------------------------------------
+ bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
+ {
+ std::list<cryptonote::blobdata> tx_blobs;
+ tx_blobs.push_back(tx_blob);
+ std::vector<tx_verification_context> tvcv(1);
+ bool r = handle_incoming_txs(tx_blobs, tvcv, keeped_by_block, relayed, do_not_relay);
+ tvc = tvcv[0];
return r;
}
//-----------------------------------------------------------------------------------------------
diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h
index e5fbf7f91..40ca9410b 100644
--- a/src/cryptonote_core/cryptonote_core.h
+++ b/src/cryptonote_core/cryptonote_core.h
@@ -40,6 +40,7 @@
#include "cryptonote_protocol/cryptonote_protocol_handler_common.h"
#include "storages/portable_storage_template_helper.h"
#include "common/download.h"
+#include "common/thread_group.h"
#include "tx_pool.h"
#include "blockchain.h"
#include "cryptonote_basic/miner.h"
@@ -115,6 +116,22 @@ namespace cryptonote
bool handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay);
/**
+ * @brief handles a list of incoming transactions
+ *
+ * Parses incoming transactions and, if nothing is obviously wrong,
+ * passes them along to the transaction pool
+ *
+ * @param tx_blobs the txs to handle
+ * @param tvc metadata about the transactions' validity
+ * @param keeped_by_block if the transactions have been in a block
+ * @param relayed whether or not the transactions were relayed to us
+ * @param do_not_relay whether to prevent the transactions from being relayed
+ *
+ * @return true if the transactions made it to the transaction pool, otherwise false
+ */
+ bool handle_incoming_txs(const std::list<blobdata>& tx_blobs, std::vector<tx_verification_context>& tvc, bool keeped_by_block, bool relayed, bool do_not_relay);
+
+ /**
* @brief handles an incoming block
*
* periodic update to checkpoints is triggered here
@@ -753,6 +770,9 @@ namespace cryptonote
*/
bool check_tx_semantic(const transaction& tx, bool keeped_by_block) const;
+ bool handle_incoming_tx_pre(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay);
+ bool handle_incoming_tx_post(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay);
+
/**
* @copydoc miner::on_block_chain_update
*
@@ -859,6 +879,9 @@ namespace cryptonote
time_t start_time;
std::unordered_set<crypto::hash> bad_semantics_txes[2];
+ boost::mutex bad_semantics_txes_lock;
+
+ tools::thread_group m_threadpool;
enum {
UPDATES_DISABLED,
diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl
index 9f44a13be..9dcc8901a 100644
--- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl
+++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl
@@ -986,6 +986,7 @@ namespace cryptonote
m_core.prepare_handle_incoming_blocks(blocks);
uint64_t block_process_time_full = 0, transactions_process_time_full = 0;
+ size_t num_txs = 0;
for(const block_complete_entry& block_entry: blocks)
{
if (m_stopping)
@@ -996,15 +997,17 @@ namespace cryptonote
// process transactions
TIME_MEASURE_START(transactions_process_time);
- for(auto& tx_blob: block_entry.txs)
+ num_txs += block_entry.txs.size();
+ std::vector<tx_verification_context> tvc;
+ m_core.handle_incoming_txs(block_entry.txs, tvc, true, true, false);
+ std::list<blobdata>::const_iterator it = block_entry.txs.begin();
+ for (size_t i = 0; i < tvc.size(); ++i, ++it)
{
- tx_verification_context tvc = AUTO_VAL_INIT(tvc);
- m_core.handle_incoming_tx(tx_blob, tvc, true, true, false);
- if(tvc.m_verifivation_failed)
+ if(tvc[i].m_verifivation_failed)
{
if (!m_p2p->for_connection(span_connection_id, [&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t f)->bool{
LOG_ERROR_CCONTEXT("transaction verification failed on NOTIFY_RESPONSE_GET_OBJECTS, tx_id = "
- << epee::string_tools::pod_to_hex(get_blob_hash(tx_blob)) << ", dropping connection");
+ << epee::string_tools::pod_to_hex(get_blob_hash(*it)) << ", dropping connection");
m_p2p->drop_connection(context);
m_block_queue.flush_spans(context.m_connection_id, true);
return true;
@@ -1065,7 +1068,7 @@ namespace cryptonote
} // each download block
- LOG_PRINT_CCONTEXT_L2("Block process time (" << blocks.size() << " blocks): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms");
+ MCINFO("sync-info", "Block process time (" << blocks.size() << " blocks, " << num_txs << " txs): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms");
m_core.cleanup_handle_incoming_blocks();