aboutsummaryrefslogtreecommitdiff
path: root/src/cryptonote_core
diff options
context:
space:
mode:
authorHoward Chu <hyc@symas.com>2017-09-14 04:39:37 +0100
committerHoward Chu <hyc@symas.com>2017-09-14 21:42:48 +0100
commit510d0d47537aea8eff2e9c855099c4d4839cef32 (patch)
tree79d1d9d4ae9edb914ba2fc8a96e7516ff29b200d /src/cryptonote_core
parentMerge pull request #2438 (diff)
downloadmonero-510d0d47537aea8eff2e9c855099c4d4839cef32.tar.xz
Use a threadpool
Instead of constantly creating and destroying threads
Diffstat (limited to 'src/cryptonote_core')
-rw-r--r--src/cryptonote_core/blockchain.cpp72
-rw-r--r--src/cryptonote_core/cryptonote_core.cpp77
-rw-r--r--src/cryptonote_core/cryptonote_core.h4
3 files changed, 56 insertions, 97 deletions
diff --git a/src/cryptonote_core/blockchain.cpp b/src/cryptonote_core/blockchain.cpp
index 69d2edf65..7226a19b2 100644
--- a/src/cryptonote_core/blockchain.cpp
+++ b/src/cryptonote_core/blockchain.cpp
@@ -45,6 +45,7 @@
#include "profile_tools.h"
#include "file_io_utils.h"
#include "common/int-util.h"
+#include "common/threadpool.h"
#include "common/boost_serialization_helper.h"
#include "warnings.h"
#include "crypto/hash.h"
@@ -2513,33 +2514,9 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
std::vector < uint64_t > results;
results.resize(tx.vin.size(), 0);
- int threads = tools::get_max_concurrency();
-
- boost::asio::io_service ioservice;
- boost::thread_group threadpool;
- bool ioservice_active = false;
-
- std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
- if(threads > 1)
- {
- for (int i = 0; i < threads; i++)
- {
- threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
- }
- ioservice_active = true;
- }
-
-#define KILL_IOSERVICE() \
- if(ioservice_active) \
- { \
- work.reset(); \
- while (!ioservice.stopped()) ioservice.poll(); \
- threadpool.join_all(); \
- ioservice.stop(); \
- ioservice_active = false; \
- }
-
- epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); });
+ tools::threadpool& tpool = tools::threadpool::getInstance();
+ tools::threadpool::waiter waiter;
+ int threads = tpool.get_max_concurrency();
for (const auto& txin : tx.vin)
{
@@ -2600,7 +2577,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
{
// ND: Speedup
// 1. Thread ring signature verification if possible.
- ioservice.dispatch(boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index])));
+ tpool.submit(&waiter, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index])));
}
else
{
@@ -2623,8 +2600,8 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
sig_index++;
}
-
- KILL_IOSERVICE();
+ if (tx.version == 1 && threads > 1)
+ waiter.wait();
if (tx.version == 1)
{
@@ -3699,7 +3676,8 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
return true;
bool blocks_exist = false;
- uint64_t threads = tools::get_max_concurrency();
+ tools::threadpool& tpool = tools::threadpool::getInstance();
+ uint64_t threads = tpool.get_max_concurrency();
if (blocks_entry.size() > 1 && threads > 1 && m_max_prepare_blocks_threads > 1)
{
@@ -3708,15 +3686,12 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
threads = m_max_prepare_blocks_threads;
uint64_t height = m_db->height();
- std::vector<boost::thread *> thread_list;
int batches = blocks_entry.size() / threads;
int extra = blocks_entry.size() % threads;
MDEBUG("block_batches: " << batches);
std::vector<std::unordered_map<crypto::hash, crypto::hash>> maps(threads);
std::vector < std::vector < block >> blocks(threads);
auto it = blocks_entry.begin();
- boost::thread::attributes attrs;
- attrs.set_stack_size(THREAD_STACK_SIZE);
for (uint64_t i = 0; i < threads; i++)
{
@@ -3775,19 +3750,14 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
{
m_blocks_longhash_table.clear();
uint64_t thread_height = height;
+ tools::threadpool::waiter waiter;
for (uint64_t i = 0; i < threads; i++)
{
- thread_list.push_back(new boost::thread(attrs, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i]))));
+ tpool.submit(&waiter, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i])));
thread_height += blocks[i].size();
}
- for (size_t j = 0; j < thread_list.size(); j++)
- {
- thread_list[j]->join();
- delete thread_list[j];
- }
-
- thread_list.clear();
+ waiter.wait();
if (m_cancel)
return false;
@@ -3911,30 +3881,20 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
// [output] stores all transactions for each tx_out_index::hash found
std::vector<std::unordered_map<crypto::hash, cryptonote::transaction>> transactions(amounts.size());
- threads = tools::get_max_concurrency();
+ threads = tpool.get_max_concurrency();
if (!m_db->can_thread_bulk_indices())
threads = 1;
if (threads > 1)
{
- boost::asio::io_service ioservice;
- boost::thread_group threadpool;
- std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
-
- for (uint64_t i = 0; i < threads; i++)
- {
- threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
- }
+ tools::threadpool::waiter waiter;
for (size_t i = 0; i < amounts.size(); i++)
{
uint64_t amount = amounts[i];
- ioservice.dispatch(boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i])));
+ tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i])));
}
-
- work.reset();
- threadpool.join_all();
- ioservice.stop();
+ waiter.wait();
}
else
{
diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp
index c3aeb15f0..903e9661c 100644
--- a/src/cryptonote_core/cryptonote_core.cpp
+++ b/src/cryptonote_core/cryptonote_core.cpp
@@ -37,7 +37,7 @@ using namespace epee;
#include "common/util.h"
#include "common/updates.h"
#include "common/download.h"
-#include "common/task_region.h"
+#include "common/threadpool.h"
#include "warnings.h"
#include "crypto/crypto.h"
#include "cryptonote_config.h"
@@ -74,7 +74,7 @@ namespace cryptonote
m_last_dns_checkpoints_update(0),
m_last_json_checkpoints_update(0),
m_disable_dns_checkpoints(false),
- m_threadpool(tools::thread_group::optimal()),
+ m_threadpool(tools::threadpool::getInstance()),
m_update_download(0)
{
m_checkpoints_updating.clear();
@@ -591,54 +591,53 @@ namespace cryptonote
std::vector<result> results(tx_blobs.size());
tvc.resize(tx_blobs.size());
- tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) {
- std::list<blobdata>::const_iterator it = tx_blobs.begin();
- for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
- region.run([&, i, it] {
+ tools::threadpool::waiter waiter;
+ std::list<blobdata>::const_iterator it = tx_blobs.begin();
+ for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
+ m_threadpool.submit(&waiter, [&, i, it] {
+ try
+ {
+ results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
+ }
+ catch (const std::exception &e)
+ {
+ MERROR_VER("Exception in handle_incoming_tx_pre: " << e.what());
+ results[i].res = false;
+ }
+ });
+ }
+ waiter.wait();
+ it = tx_blobs.begin();
+ for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
+ if (!results[i].res)
+ continue;
+ if(m_mempool.have_tx(results[i].hash))
+ {
+ LOG_PRINT_L2("tx " << results[i].hash << "already have transaction in tx_pool");
+ }
+ else if(m_blockchain_storage.have_tx(results[i].hash))
+ {
+ LOG_PRINT_L2("tx " << results[i].hash << " already have transaction in blockchain");
+ }
+ else
+ {
+ m_threadpool.submit(&waiter, [&, i, it] {
try
{
- results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
+ results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
}
catch (const std::exception &e)
{
- MERROR_VER("Exception in handle_incoming_tx_pre: " << e.what());
+ MERROR_VER("Exception in handle_incoming_tx_post: " << e.what());
results[i].res = false;
}
});
}
- });
- tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) {
- std::list<blobdata>::const_iterator it = tx_blobs.begin();
- for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
- if (!results[i].res)
- continue;
- if(m_mempool.have_tx(results[i].hash))
- {
- LOG_PRINT_L2("tx " << results[i].hash << "already have transaction in tx_pool");
- }
- else if(m_blockchain_storage.have_tx(results[i].hash))
- {
- LOG_PRINT_L2("tx " << results[i].hash << " already have transaction in blockchain");
- }
- else
- {
- region.run([&, i, it] {
- try
- {
- results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
- }
- catch (const std::exception &e)
- {
- MERROR_VER("Exception in handle_incoming_tx_post: " << e.what());
- results[i].res = false;
- }
- });
- }
- }
- });
+ }
+ waiter.wait();
bool ok = true;
- std::list<blobdata>::const_iterator it = tx_blobs.begin();
+ it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
if (!results[i].res)
{
diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h
index 8e17569a9..0a64b4b4a 100644
--- a/src/cryptonote_core/cryptonote_core.h
+++ b/src/cryptonote_core/cryptonote_core.h
@@ -40,7 +40,7 @@
#include "cryptonote_protocol/cryptonote_protocol_handler_common.h"
#include "storages/portable_storage_template_helper.h"
#include "common/download.h"
-#include "common/thread_group.h"
+#include "common/threadpool.h"
#include "tx_pool.h"
#include "blockchain.h"
#include "cryptonote_basic/miner.h"
@@ -940,7 +940,7 @@ namespace cryptonote
std::unordered_set<crypto::hash> bad_semantics_txes[2];
boost::mutex bad_semantics_txes_lock;
- tools::thread_group m_threadpool;
+ tools::threadpool& m_threadpool;
enum {
UPDATES_DISABLED,