From a4e950606916bce12d83076a912ac9b965e5465c Mon Sep 17 00:00:00 2001 From: moneromooo-monero Date: Fri, 27 Nov 2015 00:03:43 +0000 Subject: wallet2: split pull blocks between pulling and processing --- src/wallet/wallet2.cpp | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) (limited to 'src/wallet/wallet2.cpp') diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index a6ac860c7..aeea34010 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -478,19 +478,26 @@ void wallet2::parse_block_round(const cryptonote::blobdata &blob, cryptonote::bl bl_id = get_block_hash(bl); } //---------------------------------------------------------------------------------------------------- -void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) +void wallet2::pull_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list &blocks) { - blocks_added = 0; cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request req = AUTO_VAL_INIT(req); cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response res = AUTO_VAL_INIT(res); get_short_chain_history(req.block_ids); + req.start_height = start_height; bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getblocks.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_blocks_error, res.status); - size_t current_index = res.start_height; + blocks_start_height = res.start_height; + blocks = res.blocks; +} +//---------------------------------------------------------------------------------------------------- +void wallet2::process_blocks(uint64_t start_height, const std::list &blocks, uint64_t& blocks_added) +{ + size_t current_index = start_height; + blocks_added = 0; int threads = std::thread::hardware_concurrency(); if (threads > 1) @@ -498,7 +505,6 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) std::vector round_block_hashes(threads); std::vector round_blocks(threads); std::deque error(threads); - const std::list &blocks = res.blocks; size_t blocks_size = blocks.size(); std::list::const_iterator blocki = blocks.begin(); for (size_t b = 0; b < blocks_size; b += threads) @@ -559,10 +565,10 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) } else { - BOOST_FOREACH(auto& bl_entry, res.blocks) + BOOST_FOREACH(auto& bl_entry, blocks) { cryptonote::block bl; - r = cryptonote::parse_and_validate_block_from_blob(bl_entry.block, bl); + bool r = cryptonote::parse_and_validate_block_from_blob(bl_entry.block, bl); THROW_WALLET_EXCEPTION_IF(!r, error::block_parse_error, bl_entry.block); crypto::hash bl_id = get_block_hash(bl); @@ -574,9 +580,9 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) else if(bl_id != m_blockchain[current_index]) { //split detected here !!! - THROW_WALLET_EXCEPTION_IF(current_index == res.start_height, error::wallet_internal_error, + THROW_WALLET_EXCEPTION_IF(current_index == start_height, error::wallet_internal_error, "wrong daemon response: split starts from the first block in response " + string_tools::pod_to_hex(bl_id) + - " (height " + std::to_string(res.start_height) + "), local block id at this height: " + + " (height " + std::to_string(start_height) + "), local block id at this height: " + string_tools::pod_to_hex(m_blockchain[current_index])); detach_blockchain(current_index); @@ -616,7 +622,10 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re { try { - pull_blocks(start_height, added_blocks); + uint64_t blocks_start_height; + std::list blocks; + pull_blocks(start_height, blocks_start_height, blocks); + process_blocks(blocks_start_height, blocks, added_blocks); blocks_fetched += added_blocks; if(!added_blocks) break; -- cgit v1.2.3 From d0eaf1d4e155ee55bc3664a80f991f27d43cf71f Mon Sep 17 00:00:00 2001 From: moneromooo-monero Date: Fri, 27 Nov 2015 00:35:41 +0000 Subject: wallet2: maintain the short chain manually when refreshing --- src/wallet/wallet2.cpp | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) (limited to 'src/wallet/wallet2.cpp') diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index aeea34010..69b863c3a 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -478,11 +478,11 @@ void wallet2::parse_block_round(const cryptonote::blobdata &blob, cryptonote::bl bl_id = get_block_hash(bl); } //---------------------------------------------------------------------------------------------------- -void wallet2::pull_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list &blocks) +void wallet2::pull_blocks(uint64_t start_height, uint64_t &blocks_start_height, const std::list &short_chain_history, std::list &blocks) { cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request req = AUTO_VAL_INIT(req); cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response res = AUTO_VAL_INIT(res); - get_short_chain_history(req.block_ids); + req.block_ids = short_chain_history; req.start_height = start_height; bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getblocks.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); @@ -617,18 +617,31 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re uint64_t added_blocks = 0; size_t try_count = 0; crypto::hash last_tx_hash_id = m_transfers.size() ? get_transaction_hash(m_transfers.back().m_tx) : null_hash; + std::list short_chain_history; + get_short_chain_history(short_chain_history); while(m_run.load(std::memory_order_relaxed)) { try { uint64_t blocks_start_height; std::list blocks; - pull_blocks(start_height, blocks_start_height, blocks); + pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); process_blocks(blocks_start_height, blocks, added_blocks); blocks_fetched += added_blocks; if(!added_blocks) break; + cryptonote::block bl; + + // prepend the last 3 blocks, should be enough to guard against a block or two's reorg + std::list::const_reverse_iterator i = blocks.rbegin(); + for (int n = 0; n < 3; ++n) + { + bool ok = cryptonote::parse_and_validate_block_from_blob(i->block, bl); + THROW_WALLET_EXCEPTION_IF(!ok, error::block_parse_error, i->block); + short_chain_history.push_front(cryptonote::get_block_hash(bl)); + ++i; + } } catch (const std::exception&) { -- cgit v1.2.3 From 490590306efdd60a4663967c1c0a95d4acb4b886 Mon Sep 17 00:00:00 2001 From: moneromooo-monero Date: Fri, 27 Nov 2015 17:25:15 +0000 Subject: wallet2: parallelize pulling blocks and processing them on refresh This needed locking the use of m_http_client, to avoid collisions in I/O. --- src/wallet/wallet2.cpp | 57 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 13 deletions(-) (limited to 'src/wallet/wallet2.cpp') diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index 69b863c3a..f79bad474 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -274,7 +274,9 @@ void wallet2::process_new_transaction(const cryptonote::transaction& tx, uint64_ cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::request req = AUTO_VAL_INIT(req); cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response res = AUTO_VAL_INIT(res); req.txid = get_transaction_hash(tx); + m_daemon_rpc_mutex.lock(); bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/get_o_indexes.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "get_o_indexes.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "get_o_indexes.bin"); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_out_indices_error, res.status); @@ -485,7 +487,9 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t &blocks_start_height, req.block_ids = short_chain_history; req.start_height = start_height; + m_daemon_rpc_mutex.lock(); bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getblocks.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_blocks_error, res.status); @@ -610,6 +614,23 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched) refresh(start_height, blocks_fetched, received_money); } //---------------------------------------------------------------------------------------------------- +void wallet2::pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list &short_chain_history, const std::list &prev_blocks, std::list &blocks) +{ + // prepend the last 3 blocks, should be enough to guard against a block or two's reorg + cryptonote::block bl; + std::list::const_reverse_iterator i = prev_blocks.rbegin(); + for (size_t n = 0; n < std::min((size_t)3, prev_blocks.size()); ++n) + { + bool ok = cryptonote::parse_and_validate_block_from_blob(i->block, bl); + THROW_WALLET_EXCEPTION_IF(!ok, error::block_parse_error, i->block); + short_chain_history.push_front(cryptonote::get_block_hash(bl)); + ++i; + } + + // pull the new blocks + pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); +} +//---------------------------------------------------------------------------------------------------- void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& received_money) { received_money = false; @@ -618,30 +639,32 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re size_t try_count = 0; crypto::hash last_tx_hash_id = m_transfers.size() ? get_transaction_hash(m_transfers.back().m_tx) : null_hash; std::list short_chain_history; + std::thread pull_thread; + uint64_t blocks_start_height; + std::list blocks; + // pull the first set of blocks get_short_chain_history(short_chain_history); + pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); + while(m_run.load(std::memory_order_relaxed)) { try { - uint64_t blocks_start_height; - std::list blocks; - pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); + // pull the next set of blocks while we're processing the current one + uint64_t next_blocks_start_height; + std::list next_blocks; + pull_thread = std::thread([&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks);}); + process_blocks(blocks_start_height, blocks, added_blocks); blocks_fetched += added_blocks; + pull_thread.join(); if(!added_blocks) break; - cryptonote::block bl; - // prepend the last 3 blocks, should be enough to guard against a block or two's reorg - std::list::const_reverse_iterator i = blocks.rbegin(); - for (int n = 0; n < 3; ++n) - { - bool ok = cryptonote::parse_and_validate_block_from_blob(i->block, bl); - THROW_WALLET_EXCEPTION_IF(!ok, error::block_parse_error, i->block); - short_chain_history.push_front(cryptonote::get_block_hash(bl)); - ++i; - } + // switch to the new blocks from the daemon + blocks_start_height = next_blocks_start_height; + blocks = next_blocks; } catch (const std::exception&) { @@ -1072,6 +1095,8 @@ bool wallet2::prepare_file_names(const std::string& file_path) //---------------------------------------------------------------------------------------------------- bool wallet2::check_connection() { + std::lock_guard lock(m_daemon_rpc_mutex); + if(m_http_client.is_connected()) return true; @@ -1284,7 +1309,9 @@ void wallet2::rescan_spent() COMMAND_RPC_IS_KEY_IMAGE_SPENT::request req = AUTO_VAL_INIT(req); COMMAND_RPC_IS_KEY_IMAGE_SPENT::response daemon_resp = AUTO_VAL_INIT(daemon_resp); req.key_images = key_images; + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/is_key_image_spent", req, daemon_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "is_key_image_spent"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "is_key_image_spent"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::is_key_image_spent_error, daemon_resp.status); @@ -1591,7 +1618,9 @@ void wallet2::commit_tx(pending_tx& ptx) COMMAND_RPC_SEND_RAW_TX::request req; req.tx_as_hex = epee::string_tools::buff_to_hex_nodelimer(tx_to_blob(ptx.tx)); COMMAND_RPC_SEND_RAW_TX::response daemon_send_resp; + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/sendrawtransaction", req, daemon_send_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "sendrawtransaction"); THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "sendrawtransaction"); THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status != CORE_RPC_STATUS_OK, error::tx_rejected, ptx.tx, daemon_send_resp.status); @@ -1779,7 +1808,9 @@ void wallet2::transfer_selected(const std::vectoramount()); } + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status); -- cgit v1.2.3 From 9ee48e9071e942738f3c4dc57820e7dddf7cdd4c Mon Sep 17 00:00:00 2001 From: moneromooo-monero Date: Sat, 28 Nov 2015 19:05:49 +0000 Subject: wallet2: speed up wallet refresh for large miners --- src/wallet/wallet2.cpp | 61 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 14 deletions(-) (limited to 'src/wallet/wallet2.cpp') diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index f79bad474..6136480d6 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -203,31 +203,64 @@ void wallet2::process_new_transaction(const cryptonote::transaction& tx, uint64_ tx_pub_key = pub_key_field.pub_key; bool r = true; - int threads; + int threads = std::thread::hardware_concurrency(); if (miner_tx && m_refresh_type == RefreshNoCoinbase) { // assume coinbase isn't for us } else if (miner_tx && m_refresh_type == RefreshOptimizeCoinbase) { - for (size_t i = 0; i < tx.vout.size(); ++i) + uint64_t money_transfered = 0; + bool error = false; + check_acc_out(m_account.get_keys(), tx.vout[0], tx_pub_key, 0, money_transfered, error); + if (error) + { + r = false; + } + else { - uint64_t money_transfered = 0; - bool error = false; - check_acc_out(m_account.get_keys(), tx.vout[i], tx_pub_key, i, money_transfered, error); - if (error) + // this assumes that the miner tx pays a single address + if (money_transfered > 0) { - r = false; - break; + outs.push_back(0); + tx_money_got_in_outs = money_transfered; + + // process the other outs from that tx + boost::asio::io_service ioservice; + boost::thread_group threadpool; + std::auto_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); + for (int i = 0; i < threads; i++) + { + threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); + } + + const account_keys &keys = m_account.get_keys(); + std::vector money_transfered(tx.vout.size()); + std::deque error(tx.vout.size()); + // the first one was already checked + for (size_t i = 1; i < tx.vout.size(); ++i) + { + ioservice.dispatch(boost::bind(&wallet2::check_acc_out, this, std::cref(keys), std::cref(tx.vout[i]), std::cref(tx_pub_key), i, + std::ref(money_transfered[i]), std::ref(error[i]))); + } + KILL_IOSERVICE(); + for (size_t i = 1; i < tx.vout.size(); ++i) + { + if (error[i]) + { + r = false; + break; + } + if (money_transfered[i]) + { + outs.push_back(i); + tx_money_got_in_outs += money_transfered[i]; + } + } } - // this assumes that the miner tx pays a single address - if (money_transfered == 0) - break; - outs.push_back(i); - tx_money_got_in_outs += money_transfered; } } - else if (tx.vout.size() > 1 && (threads = std::thread::hardware_concurrency()) > 1) + else if (tx.vout.size() > 1 && threads > 1) { boost::asio::io_service ioservice; boost::thread_group threadpool; -- cgit v1.2.3