aboutsummaryrefslogtreecommitdiff
path: root/src/wallet
diff options
context:
space:
mode:
authorRiccardo Spagni <ric@spagni.net>2017-09-18 13:19:26 +0200
committerRiccardo Spagni <ric@spagni.net>2017-09-18 13:19:26 +0200
commit1a73843ceca12932e57f57caf66033c69f8c0d1f (patch)
treef210349b2a9b5343876556c4ed379444de9c0fda /src/wallet
parentMerge pull request #2416 (diff)
parentTweak concurrency limits (diff)
downloadmonero-1a73843ceca12932e57f57caf66033c69f8c0d1f.tar.xz
Merge pull request #2446
6d0ca7d1 Tweak concurrency limits (Howard Chu) 510d0d47 Use a threadpool (Howard Chu)
Diffstat (limited to 'src/wallet')
-rw-r--r--src/wallet/wallet2.cpp63
1 files changed, 20 insertions, 43 deletions
diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp
index 323a3a7fe..f72d281c7 100644
--- a/src/wallet/wallet2.cpp
+++ b/src/wallet/wallet2.cpp
@@ -45,6 +45,7 @@ using namespace epee;
#include "cryptonote_basic/cryptonote_basic_impl.h"
#include "common/boost_serialization_helper.h"
#include "common/command_line.h"
+#include "common/threadpool.h"
#include "profile_tools.h"
#include "crypto/crypto.h"
#include "serialization/binary_utils.h"
@@ -89,14 +90,6 @@ using namespace cryptonote;
#define SECOND_OUTPUT_RELATEDNESS_THRESHOLD 0.0f
-#define KILL_IOSERVICE() \
- do { \
- work.reset(); \
- while (!ioservice.stopped()) ioservice.poll(); \
- threadpool.join_all(); \
- ioservice.stop(); \
- } while(0)
-
#define KEY_IMAGE_EXPORT_FILE_MAGIC "Monero key image export\002"
namespace
@@ -684,7 +677,9 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote
std::deque<crypto::key_image> ki(tx.vout.size());
std::deque<uint64_t> amount(tx.vout.size());
std::deque<rct::key> mask(tx.vout.size());
- int threads = tools::get_max_concurrency();
+ tools::threadpool& tpool = tools::threadpool::getInstance();
+ tools::threadpool::waiter waiter;
+ int threads = tpool.get_max_concurrency();
const cryptonote::account_keys& keys = m_account.get_keys();
crypto::key_derivation derivation;
generate_key_derivation(tx_pub_key, keys.m_view_secret_key, derivation);
@@ -720,13 +715,6 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote
++num_vouts_received;
// process the other outs from that tx
- boost::asio::io_service ioservice;
- boost::thread_group threadpool;
- std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
- for (int i = 0; i < threads; i++)
- {
- threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
- }
std::vector<uint64_t> money_transfered(tx.vout.size());
std::deque<bool> error(tx.vout.size());
@@ -734,10 +722,10 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote
// the first one was already checked
for (size_t i = 1; i < tx.vout.size(); ++i)
{
- ioservice.dispatch(boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i,
+ tpool.submit(&waiter, boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i,
std::ref(received[i]), std::ref(money_transfered[i]), std::ref(error[i])));
}
- KILL_IOSERVICE();
+ waiter.wait();
for (size_t i = 1; i < tx.vout.size(); ++i)
{
if (error[i])
@@ -766,23 +754,18 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote
}
else if (tx.vout.size() > 1 && threads > 1)
{
- boost::asio::io_service ioservice;
- boost::thread_group threadpool;
- std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
- for (int i = 0; i < threads; i++)
- {
- threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
- }
+ tools::threadpool& tpool = tools::threadpool::getInstance();
+ tools::threadpool::waiter waiter;
std::vector<uint64_t> money_transfered(tx.vout.size());
std::deque<bool> error(tx.vout.size());
std::deque<bool> received(tx.vout.size());
for (size_t i = 0; i < tx.vout.size(); ++i)
{
- ioservice.dispatch(boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i,
+ tpool.submit(&waiter, boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i,
std::ref(received[i]), std::ref(money_transfered[i]), std::ref(error[i])));
}
- KILL_IOSERVICE();
+ waiter.wait();
tx_money_got_in_outs = 0;
for (size_t i = 0; i < tx.vout.size(); ++i)
{
@@ -1251,7 +1234,8 @@ void wallet2::process_blocks(uint64_t start_height, const std::list<cryptonote::
THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "size mismatch");
- int threads = tools::get_max_concurrency();
+ tools::threadpool& tpool = tools::threadpool::getInstance();
+ int threads = tpool.get_max_concurrency();
if (threads > 1)
{
std::vector<crypto::hash> round_block_hashes(threads);
@@ -1262,23 +1246,16 @@ void wallet2::process_blocks(uint64_t start_height, const std::list<cryptonote::
for (size_t b = 0; b < blocks_size; b += threads)
{
size_t round_size = std::min((size_t)threads, blocks_size - b);
-
- boost::asio::io_service ioservice;
- boost::thread_group threadpool;
- std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
- for (size_t i = 0; i < round_size; i++)
- {
- threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
- }
+ tools::threadpool::waiter waiter;
std::list<block_complete_entry>::const_iterator tmpblocki = blocki;
for (size_t i = 0; i < round_size; ++i)
{
- ioservice.dispatch(boost::bind(&wallet2::parse_block_round, this, std::cref(tmpblocki->block),
+ tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(tmpblocki->block),
std::ref(round_blocks[i]), std::ref(round_block_hashes[i]), std::ref(error[i])));
++tmpblocki;
}
- KILL_IOSERVICE();
+ waiter.wait();
tmpblocki = blocki;
for (size_t i = 0; i < round_size; ++i)
{
@@ -1698,7 +1675,8 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
size_t try_count = 0;
crypto::hash last_tx_hash_id = m_transfers.size() ? m_transfers.back().m_txid : null_hash;
std::list<crypto::hash> short_chain_history;
- boost::thread pull_thread;
+ tools::threadpool& tpool = tools::threadpool::getInstance();
+ tools::threadpool::waiter waiter;
uint64_t blocks_start_height;
std::list<cryptonote::block_complete_entry> blocks;
std::vector<COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> o_indices;
@@ -1736,11 +1714,11 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
std::list<cryptonote::block_complete_entry> next_blocks;
std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> next_o_indices;
bool error = false;
- pull_thread = boost::thread([&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_o_indices, error);});
+ tpool.submit(&waiter, [&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_o_indices, error);});
process_blocks(blocks_start_height, blocks, o_indices, added_blocks);
blocks_fetched += added_blocks;
- pull_thread.join();
+ waiter.wait();
if(blocks_start_height == next_blocks_start_height)
{
m_node_rpc_proxy.set_height(m_blockchain.size());
@@ -1762,8 +1740,7 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
catch (const std::exception&)
{
blocks_fetched += added_blocks;
- if (pull_thread.joinable())
- pull_thread.join();
+ waiter.wait();
if(try_count < 3)
{
LOG_PRINT_L1("Another try pull_blocks (try_count=" << try_count << ")...");