diff options
author | Riccardo Spagni <ric@spagni.net> | 2015-11-30 00:17:31 +0200 |
---|---|---|
committer | Riccardo Spagni <ric@spagni.net> | 2015-11-30 00:17:34 +0200 |
commit | 5c3b29792cfa7727e575ccf6947ce8d4a21ffccc (patch) | |
tree | b0e50b22ef1409b08a1a07ce2e262512bbca98c7 | |
parent | Merge pull request #503 (diff) | |
parent | wallet2: speed up wallet refresh for large miners (diff) | |
download | monero-5c3b29792cfa7727e575ccf6947ce8d4a21ffccc.tar.xz |
Merge pull request #504
9ee48e9 wallet2: speed up wallet refresh for large miners (moneromooo-monero)
4905903 wallet2: parallelize pulling blocks and processing them on refresh (moneromooo-monero)
d0eaf1d wallet2: maintain the short chain manually when refreshing (moneromooo-monero)
a4e9506 wallet2: split pull blocks between pulling and processing (moneromooo-monero)
-rw-r--r-- | src/wallet/wallet2.cpp | 134 | ||||
-rw-r--r-- | src/wallet/wallet2.h | 8 |
2 files changed, 117 insertions, 25 deletions
diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index a6ac860c7..6136480d6 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -203,31 +203,64 @@ void wallet2::process_new_transaction(const cryptonote::transaction& tx, uint64_ tx_pub_key = pub_key_field.pub_key; bool r = true; - int threads; + int threads = std::thread::hardware_concurrency(); if (miner_tx && m_refresh_type == RefreshNoCoinbase) { // assume coinbase isn't for us } else if (miner_tx && m_refresh_type == RefreshOptimizeCoinbase) { - for (size_t i = 0; i < tx.vout.size(); ++i) + uint64_t money_transfered = 0; + bool error = false; + check_acc_out(m_account.get_keys(), tx.vout[0], tx_pub_key, 0, money_transfered, error); + if (error) { - uint64_t money_transfered = 0; - bool error = false; - check_acc_out(m_account.get_keys(), tx.vout[i], tx_pub_key, i, money_transfered, error); - if (error) + r = false; + } + else + { + // this assumes that the miner tx pays a single address + if (money_transfered > 0) { - r = false; - break; + outs.push_back(0); + tx_money_got_in_outs = money_transfered; + + // process the other outs from that tx + boost::asio::io_service ioservice; + boost::thread_group threadpool; + std::auto_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); + for (int i = 0; i < threads; i++) + { + threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); + } + + const account_keys &keys = m_account.get_keys(); + std::vector<uint64_t> money_transfered(tx.vout.size()); + std::deque<bool> error(tx.vout.size()); + // the first one was already checked + for (size_t i = 1; i < tx.vout.size(); ++i) + { + ioservice.dispatch(boost::bind(&wallet2::check_acc_out, this, std::cref(keys), std::cref(tx.vout[i]), std::cref(tx_pub_key), i, + std::ref(money_transfered[i]), std::ref(error[i]))); + } + KILL_IOSERVICE(); + for (size_t i = 1; i < tx.vout.size(); ++i) + { + if (error[i]) + { + r = false; + break; + } + if (money_transfered[i]) + { + outs.push_back(i); + tx_money_got_in_outs += money_transfered[i]; + } + } } - // this assumes that the miner tx pays a single address - if (money_transfered == 0) - break; - outs.push_back(i); - tx_money_got_in_outs += money_transfered; } } - else if (tx.vout.size() > 1 && (threads = std::thread::hardware_concurrency()) > 1) + else if (tx.vout.size() > 1 && threads > 1) { boost::asio::io_service ioservice; boost::thread_group threadpool; @@ -274,7 +307,9 @@ void wallet2::process_new_transaction(const cryptonote::transaction& tx, uint64_ cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::request req = AUTO_VAL_INIT(req); cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response res = AUTO_VAL_INIT(res); req.txid = get_transaction_hash(tx); + m_daemon_rpc_mutex.lock(); bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/get_o_indexes.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "get_o_indexes.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "get_o_indexes.bin"); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_out_indices_error, res.status); @@ -478,19 +513,28 @@ void wallet2::parse_block_round(const cryptonote::blobdata &blob, cryptonote::bl bl_id = get_block_hash(bl); } //---------------------------------------------------------------------------------------------------- -void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) +void wallet2::pull_blocks(uint64_t start_height, uint64_t &blocks_start_height, const std::list<crypto::hash> &short_chain_history, std::list<cryptonote::block_complete_entry> &blocks) { - blocks_added = 0; cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request req = AUTO_VAL_INIT(req); cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response res = AUTO_VAL_INIT(res); - get_short_chain_history(req.block_ids); + req.block_ids = short_chain_history; + req.start_height = start_height; + m_daemon_rpc_mutex.lock(); bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getblocks.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_blocks_error, res.status); - size_t current_index = res.start_height; + blocks_start_height = res.start_height; + blocks = res.blocks; +} +//---------------------------------------------------------------------------------------------------- +void wallet2::process_blocks(uint64_t start_height, const std::list<cryptonote::block_complete_entry> &blocks, uint64_t& blocks_added) +{ + size_t current_index = start_height; + blocks_added = 0; int threads = std::thread::hardware_concurrency(); if (threads > 1) @@ -498,7 +542,6 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) std::vector<crypto::hash> round_block_hashes(threads); std::vector<cryptonote::block> round_blocks(threads); std::deque<bool> error(threads); - const std::list<block_complete_entry> &blocks = res.blocks; size_t blocks_size = blocks.size(); std::list<block_complete_entry>::const_iterator blocki = blocks.begin(); for (size_t b = 0; b < blocks_size; b += threads) @@ -559,10 +602,10 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) } else { - BOOST_FOREACH(auto& bl_entry, res.blocks) + BOOST_FOREACH(auto& bl_entry, blocks) { cryptonote::block bl; - r = cryptonote::parse_and_validate_block_from_blob(bl_entry.block, bl); + bool r = cryptonote::parse_and_validate_block_from_blob(bl_entry.block, bl); THROW_WALLET_EXCEPTION_IF(!r, error::block_parse_error, bl_entry.block); crypto::hash bl_id = get_block_hash(bl); @@ -574,9 +617,9 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) else if(bl_id != m_blockchain[current_index]) { //split detected here !!! - THROW_WALLET_EXCEPTION_IF(current_index == res.start_height, error::wallet_internal_error, + THROW_WALLET_EXCEPTION_IF(current_index == start_height, error::wallet_internal_error, "wrong daemon response: split starts from the first block in response " + string_tools::pod_to_hex(bl_id) + - " (height " + std::to_string(res.start_height) + "), local block id at this height: " + + " (height " + std::to_string(start_height) + "), local block id at this height: " + string_tools::pod_to_hex(m_blockchain[current_index])); detach_blockchain(current_index); @@ -604,6 +647,23 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched) refresh(start_height, blocks_fetched, received_money); } //---------------------------------------------------------------------------------------------------- +void wallet2::pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history, const std::list<cryptonote::block_complete_entry> &prev_blocks, std::list<cryptonote::block_complete_entry> &blocks) +{ + // prepend the last 3 blocks, should be enough to guard against a block or two's reorg + cryptonote::block bl; + std::list<cryptonote::block_complete_entry>::const_reverse_iterator i = prev_blocks.rbegin(); + for (size_t n = 0; n < std::min((size_t)3, prev_blocks.size()); ++n) + { + bool ok = cryptonote::parse_and_validate_block_from_blob(i->block, bl); + THROW_WALLET_EXCEPTION_IF(!ok, error::block_parse_error, i->block); + short_chain_history.push_front(cryptonote::get_block_hash(bl)); + ++i; + } + + // pull the new blocks + pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); +} +//---------------------------------------------------------------------------------------------------- void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& received_money) { received_money = false; @@ -611,15 +671,33 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re uint64_t added_blocks = 0; size_t try_count = 0; crypto::hash last_tx_hash_id = m_transfers.size() ? get_transaction_hash(m_transfers.back().m_tx) : null_hash; + std::list<crypto::hash> short_chain_history; + std::thread pull_thread; + uint64_t blocks_start_height; + std::list<cryptonote::block_complete_entry> blocks; + + // pull the first set of blocks + get_short_chain_history(short_chain_history); + pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); while(m_run.load(std::memory_order_relaxed)) { try { - pull_blocks(start_height, added_blocks); + // pull the next set of blocks while we're processing the current one + uint64_t next_blocks_start_height; + std::list<cryptonote::block_complete_entry> next_blocks; + pull_thread = std::thread([&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks);}); + + process_blocks(blocks_start_height, blocks, added_blocks); blocks_fetched += added_blocks; + pull_thread.join(); if(!added_blocks) break; + + // switch to the new blocks from the daemon + blocks_start_height = next_blocks_start_height; + blocks = next_blocks; } catch (const std::exception&) { @@ -1050,6 +1128,8 @@ bool wallet2::prepare_file_names(const std::string& file_path) //---------------------------------------------------------------------------------------------------- bool wallet2::check_connection() { + std::lock_guard<std::mutex> lock(m_daemon_rpc_mutex); + if(m_http_client.is_connected()) return true; @@ -1262,7 +1342,9 @@ void wallet2::rescan_spent() COMMAND_RPC_IS_KEY_IMAGE_SPENT::request req = AUTO_VAL_INIT(req); COMMAND_RPC_IS_KEY_IMAGE_SPENT::response daemon_resp = AUTO_VAL_INIT(daemon_resp); req.key_images = key_images; + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/is_key_image_spent", req, daemon_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "is_key_image_spent"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "is_key_image_spent"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::is_key_image_spent_error, daemon_resp.status); @@ -1569,7 +1651,9 @@ void wallet2::commit_tx(pending_tx& ptx) COMMAND_RPC_SEND_RAW_TX::request req; req.tx_as_hex = epee::string_tools::buff_to_hex_nodelimer(tx_to_blob(ptx.tx)); COMMAND_RPC_SEND_RAW_TX::response daemon_send_resp; + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/sendrawtransaction", req, daemon_send_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "sendrawtransaction"); THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "sendrawtransaction"); THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status != CORE_RPC_STATUS_OK, error::tx_rejected, ptx.tx, daemon_send_resp.status); @@ -1757,7 +1841,9 @@ void wallet2::transfer_selected(const std::vector<cryptonote::tx_destination_ent req.amounts.push_back(it->amount()); } + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status); diff --git a/src/wallet/wallet2.h b/src/wallet/wallet2.h index e036020b8..b71697c1f 100644 --- a/src/wallet/wallet2.h +++ b/src/wallet/wallet2.h @@ -355,7 +355,9 @@ namespace tools bool is_tx_spendtime_unlocked(uint64_t unlock_time) const; bool is_transfer_unlocked(const transfer_details& td) const; bool clear(); - void pull_blocks(uint64_t start_height, uint64_t& blocks_added); + void pull_blocks(uint64_t start_height, uint64_t& blocks_start_height, const std::list<crypto::hash> &short_chain_history, std::list<cryptonote::block_complete_entry> &blocks); + void pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history, const std::list<cryptonote::block_complete_entry> &prev_blocks, std::list<cryptonote::block_complete_entry> &blocks); + void process_blocks(uint64_t start_height, const std::list<cryptonote::block_complete_entry> &blocks, uint64_t& blocks_added); uint64_t select_transfers(uint64_t needed_money, bool add_dust, uint64_t dust, std::list<transfer_container::iterator>& selected_transfers); bool prepare_file_names(const std::string& file_path); void process_unconfirmed(const cryptonote::transaction& tx, uint64_t height); @@ -387,6 +389,8 @@ namespace tools std::atomic<bool> m_run; + std::mutex m_daemon_rpc_mutex; + i_wallet2_callback* m_callback; bool m_testnet; bool m_restricted; @@ -563,7 +567,9 @@ namespace tools req.amounts.push_back(it->amount()); } + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status); |