diff options
Diffstat (limited to 'src/cryptonote_core/blockchain.cpp')
-rw-r--r-- | src/cryptonote_core/blockchain.cpp | 72 |
1 files changed, 16 insertions, 56 deletions
diff --git a/src/cryptonote_core/blockchain.cpp b/src/cryptonote_core/blockchain.cpp index 2330b6c42..619dbdc07 100644 --- a/src/cryptonote_core/blockchain.cpp +++ b/src/cryptonote_core/blockchain.cpp @@ -45,6 +45,7 @@ #include "profile_tools.h" #include "file_io_utils.h" #include "common/int-util.h" +#include "common/threadpool.h" #include "common/boost_serialization_helper.h" #include "warnings.h" #include "crypto/hash.h" @@ -2563,33 +2564,9 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, std::vector < uint64_t > results; results.resize(tx.vin.size(), 0); - int threads = tools::get_max_concurrency(); - - boost::asio::io_service ioservice; - boost::thread_group threadpool; - bool ioservice_active = false; - - std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); - if(threads > 1) - { - for (int i = 0; i < threads; i++) - { - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - } - ioservice_active = true; - } - -#define KILL_IOSERVICE() \ - if(ioservice_active) \ - { \ - work.reset(); \ - while (!ioservice.stopped()) ioservice.poll(); \ - threadpool.join_all(); \ - ioservice.stop(); \ - ioservice_active = false; \ - } - - epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); }); + tools::threadpool& tpool = tools::threadpool::getInstance(); + tools::threadpool::waiter waiter; + int threads = tpool.get_max_concurrency(); for (const auto& txin : tx.vin) { @@ -2650,7 +2627,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, { // ND: Speedup // 1. Thread ring signature verification if possible. - ioservice.dispatch(boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index]))); + tpool.submit(&waiter, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index]))); } else { @@ -2673,8 +2650,8 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, sig_index++; } - - KILL_IOSERVICE(); + if (tx.version == 1 && threads > 1) + waiter.wait(); if (tx.version == 1) { @@ -3749,7 +3726,8 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e return true; bool blocks_exist = false; - uint64_t threads = tools::get_max_concurrency(); + tools::threadpool& tpool = tools::threadpool::getInstance(); + uint64_t threads = tpool.get_max_concurrency(); if (blocks_entry.size() > 1 && threads > 1 && m_max_prepare_blocks_threads > 1) { @@ -3758,15 +3736,12 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e threads = m_max_prepare_blocks_threads; uint64_t height = m_db->height(); - std::vector<boost::thread *> thread_list; int batches = blocks_entry.size() / threads; int extra = blocks_entry.size() % threads; MDEBUG("block_batches: " << batches); std::vector<std::unordered_map<crypto::hash, crypto::hash>> maps(threads); std::vector < std::vector < block >> blocks(threads); auto it = blocks_entry.begin(); - boost::thread::attributes attrs; - attrs.set_stack_size(THREAD_STACK_SIZE); for (uint64_t i = 0; i < threads; i++) { @@ -3825,19 +3800,14 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e { m_blocks_longhash_table.clear(); uint64_t thread_height = height; + tools::threadpool::waiter waiter; for (uint64_t i = 0; i < threads; i++) { - thread_list.push_back(new boost::thread(attrs, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i])))); + tpool.submit(&waiter, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i]))); thread_height += blocks[i].size(); } - for (size_t j = 0; j < thread_list.size(); j++) - { - thread_list[j]->join(); - delete thread_list[j]; - } - - thread_list.clear(); + waiter.wait(); if (m_cancel) return false; @@ -3961,30 +3931,20 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e // [output] stores all transactions for each tx_out_index::hash found std::vector<std::unordered_map<crypto::hash, cryptonote::transaction>> transactions(amounts.size()); - threads = tools::get_max_concurrency(); + threads = tpool.get_max_concurrency(); if (!m_db->can_thread_bulk_indices()) threads = 1; if (threads > 1) { - boost::asio::io_service ioservice; - boost::thread_group threadpool; - std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); - - for (uint64_t i = 0; i < threads; i++) - { - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - } + tools::threadpool::waiter waiter; for (size_t i = 0; i < amounts.size(); i++) { uint64_t amount = amounts[i]; - ioservice.dispatch(boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i]))); + tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i]))); } - - work.reset(); - threadpool.join_all(); - ioservice.stop(); + waiter.wait(); } else { |