diff options
author | Howard Chu <hyc@symas.com> | 2017-09-14 04:39:37 +0100 |
---|---|---|
committer | Howard Chu <hyc@symas.com> | 2017-09-14 21:42:48 +0100 |
commit | 510d0d47537aea8eff2e9c855099c4d4839cef32 (patch) | |
tree | 79d1d9d4ae9edb914ba2fc8a96e7516ff29b200d | |
parent | Merge pull request #2438 (diff) | |
download | monero-510d0d47537aea8eff2e9c855099c4d4839cef32.tar.xz |
Use a threadpool
Instead of constantly creating and destroying threads
Diffstat (limited to '')
-rw-r--r-- | src/common/CMakeLists.txt | 6 | ||||
-rw-r--r-- | src/common/common_fwd.h | 3 | ||||
-rw-r--r-- | src/common/task_region.cpp | 94 | ||||
-rw-r--r-- | src/common/task_region.h | 223 | ||||
-rw-r--r-- | src/common/thread_group.cpp | 153 | ||||
-rw-r--r-- | src/common/thread_group.h | 143 | ||||
-rw-r--r-- | src/common/threadpool.cpp | 113 | ||||
-rw-r--r-- | src/common/threadpool.h | 87 | ||||
-rw-r--r-- | src/cryptonote_core/blockchain.cpp | 72 | ||||
-rw-r--r-- | src/cryptonote_core/cryptonote_core.cpp | 77 | ||||
-rw-r--r-- | src/cryptonote_core/cryptonote_core.h | 4 | ||||
-rw-r--r-- | src/ringct/rctSigs.cpp | 49 | ||||
-rw-r--r-- | src/wallet/wallet2.cpp | 63 | ||||
-rw-r--r-- | tests/unit_tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | tests/unit_tests/thread_group.cpp | 177 |
15 files changed, 302 insertions, 963 deletions
diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 55b8ad3e6..19d90253b 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -37,8 +37,7 @@ set(common_sources i18n.cpp password.cpp perf_timer.cpp - task_region.cpp - thread_group.cpp + threadpool.cpp updates.cpp) if (STACK_TRACE) @@ -66,8 +65,7 @@ set(common_private_headers password.h perf_timer.h stack_trace.h - task_region.h - thread_group.h + threadpool.h updates.h) monero_private_headers(common diff --git a/src/common/common_fwd.h b/src/common/common_fwd.h index 5d67251b1..f33e185b5 100644 --- a/src/common/common_fwd.h +++ b/src/common/common_fwd.h @@ -36,6 +36,5 @@ namespace tools struct login; class password_container; class t_http_connection; - class task_region; - class thread_group; + class threadpool; } diff --git a/src/common/task_region.cpp b/src/common/task_region.cpp deleted file mode 100644 index 9b4620c6e..000000000 --- a/src/common/task_region.cpp +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (c) 2014-2017, The Monero Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#include "common/task_region.h" - -#include <boost/thread/locks.hpp> -#include <cassert> - -/* `mark_completed` and `wait` can throw in the lock call, but its difficult to -recover from either. An exception in `wait` means the post condition of joining -all threads cannot be achieved, and an exception in `mark_completed` means -certain deadlock. `noexcept` qualifier will force a call to `std::terminate` if -locking throws an exception, which should only happen if a recursive lock -attempt is made (which is not possible since no external function is called -while holding the lock). */ - -namespace tools -{ -void task_region_handle::state::mark_completed(id task_id) noexcept { - assert(task_id != 0 && (task_id & (task_id - 1)) == 0); // power of 2 check - if (pending.fetch_and(~task_id) == task_id) { - // synchronize with wait call, but do not need to hold - boost::unique_lock<boost::mutex>{sync_on_complete}; - all_complete.notify_all(); - } -} - -void task_region_handle::state::abort() noexcept { - state* current = this; - while (current) { - current->ready = 0; - current = current->next.get(); - } -} - -void task_region_handle::state::wait() noexcept { - state* current = this; - while (current) { - { - boost::unique_lock<boost::mutex> lock{current->sync_on_complete}; - current->all_complete.wait(lock, [current] { return current->pending == 0; }); - } - current = current->next.get(); - } -} - -void task_region_handle::state::wait(thread_group& threads) noexcept { - state* current = this; - while (current) { - while (current->pending != 0) { - if (!threads.try_run_one()) { - current->wait(); - return; - } - } - current = current->next.get(); - } -} - -void task_region_handle::create_state() { - st = std::make_shared<state>(std::move(st)); - next_id = 1; -} - -void task_region_handle::do_wait() noexcept { - assert(st); - const std::shared_ptr<state> temp = std::move(st); - temp->wait(threads); -} -} diff --git a/src/common/task_region.h b/src/common/task_region.h deleted file mode 100644 index 30972cce3..000000000 --- a/src/common/task_region.h +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright (c) 2014-2017, The Monero Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#pragma once - -#include <atomic> -#include <boost/thread/condition_variable.hpp> -#include <boost/thread/mutex.hpp> -#include <memory> -#include <type_traits> -#include <utility> - -#include "common/thread_group.h" - -namespace tools -{ - -/*! A model of the fork-join concept. `run(...)` "forks" (i.e. spawns new -tasks), and `~task_region_handle()` or `wait()` "joins" the spawned tasks. -`wait` will block until all tasks have completed, while `~task_region_handle()` -blocks until all tasks have completed or aborted. - -Do _NOT_ give this object to separate thread of execution (which includes -`task_region_handle::run(...)`) because joining on a different thread is -undesireable (potential deadlock). - -This class cannot be constructed directly, use the function -`task_region(...)` instead. -*/ -class task_region_handle -{ - struct state - { - using id = unsigned; - - explicit state(std::shared_ptr<state> next_src) noexcept - : next(std::move(next_src)) - , ready(0) - , pending(0) - , sync_on_complete() - , all_complete() { - } - - state(const state&) = default; - state(state&&) = default; - ~state() = default; - state& operator=(const state&) = default; - state& operator=(state&&) = default; - - void track_id(id task_id) noexcept { - pending |= task_id; - ready |= task_id; - } - - //! \return True only once whether a given id can execute - bool can_run(id task_id) noexcept { - return (ready.fetch_and(~task_id) & task_id); - } - - //! Mark id as completed, and synchronize with waiting threads - void mark_completed(id task_id) noexcept; - - //! Tell all unstarted functions in region to return immediately - void abort() noexcept; - - //! Blocks until all functions in region have aborted or completed. - void wait() noexcept; - - //! Same as `wait()`, except `this_thread` runs tasks while waiting. - void wait(thread_group& threads) noexcept; - - private: - /* This implementation is a bit pessimistic, it ensures that all copies - of a wrapped task can only be executed once. `thread_group` should never - do this, but some variable needs to track whether an abort should be done - anyway... */ - std::shared_ptr<state> next; - std::atomic<id> ready; //!< Tracks whether a task has been invoked - std::atomic<id> pending; //!< Tracks when a task has completed or aborted - boost::mutex sync_on_complete; - boost::condition_variable all_complete; - }; - - template<typename F> - struct wrapper - { - wrapper(state::id id_src, std::shared_ptr<state> st_src, F f_src) - : task_id(id_src), st(std::move(st_src)), f(std::move(f_src)) { - } - - wrapper(const wrapper&) = default; - wrapper(wrapper&&) = default; - wrapper& operator=(const wrapper&) = default; - wrapper& operator=(wrapper&&) = default; - - void operator()() { - if (st) { - if (st->can_run(task_id)) { - f(); - } - st->mark_completed(task_id); - } - } - - private: - const state::id task_id; - std::shared_ptr<state> st; - F f; - }; - -public: - friend struct task_region_; - - task_region_handle() = delete; - task_region_handle(const task_region_handle&) = delete; - task_region_handle(task_region_handle&&) = delete; - - //! Cancels unstarted pending tasks, and waits for them to respond. - ~task_region_handle() noexcept { - if (st) { - st->abort(); - st->wait(threads); - } - } - - task_region_handle& operator=(const task_region_handle&) = delete; - task_region_handle& operator=(task_region_handle&&) = delete; - - /*! If the group has no threads, `f` is immediately run before returning. - Otherwise, `f` is dispatched to the thread_group associated with `this` - region. If `f` is dispatched to another thread, and it throws, the process - will immediately terminate. See std::packaged_task for getting exceptions on - functions executed on other threads. */ - template<typename F> - void run(F&& f) { - if (threads.count() == 0) { - f(); - } else { - if (!st || next_id == 0) { - create_state(); - } - const state::id this_id = next_id; - next_id <<= 1; - - st->track_id(this_id); - threads.dispatch(wrapper<F>{this_id, st, std::move(f)}); - } - } - - //! Wait until all functions provided to `run` have completed. - void wait() noexcept { - if (st) { - do_wait(); - } - } - -private: - explicit task_region_handle(thread_group& threads_src) - : st(nullptr), threads(threads_src), next_id(0) { - } - - void create_state(); - void do_wait() noexcept; - - std::shared_ptr<state> st; - thread_group& threads; - state::id next_id; -}; - -/*! Function for creating a `task_region_handle`, which automatically calls -`task_region_handle::wait()` before returning. If a `thread_group` is not -provided, one is created with an optimal number of threads. The callback `f` -must have the signature `void(task_region_handle&)`. */ -struct task_region_ { - template<typename F> - void operator()(thread_group& threads, F&& f) const { - static_assert( - std::is_same<void, typename std::result_of<F(task_region_handle&)>::type>::value, - "f cannot have a return value" - ); - task_region_handle region{threads}; - f(region); - region.wait(); - } - - template<typename F> - void operator()(thread_group&& threads, F&& f) const { - (*this)(threads, std::forward<F>(f)); - } - - template<typename F> - void operator()(F&& f) const { - thread_group threads; - (*this)(threads, std::forward<F>(f)); - } -}; - -constexpr const task_region_ task_region{}; -} diff --git a/src/common/thread_group.cpp b/src/common/thread_group.cpp deleted file mode 100644 index 691a27a25..000000000 --- a/src/common/thread_group.cpp +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright (c) 2014-2017, The Monero Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#include "common/thread_group.h" - -#include <boost/thread/locks.hpp> -#include <cassert> -#include <limits> -#include <stdexcept> - -#include "cryptonote_config.h" -#include "common/util.h" - -namespace tools -{ -std::size_t thread_group::optimal() { - static_assert( - std::numeric_limits<unsigned>::max() <= std::numeric_limits<std::size_t>::max(), - "unexpected truncation" - ); - const std::size_t hardware = get_max_concurrency(); - return hardware ? (hardware - 1) : 0; -} - -std::size_t thread_group::optimal_with_max(std::size_t count) { - return count ? std::min(count - 1, optimal()) : 0; -} - -thread_group::thread_group(std::size_t count) : internal() { - if (count) { - internal.emplace(count); - } -} - -thread_group::data::data(std::size_t count) - : threads() - , head{nullptr} - , last(std::addressof(head)) - , mutex() - , has_work() - , stop(false) { - threads.reserve(count); - boost::thread::attributes attrs; - attrs.set_stack_size(THREAD_STACK_SIZE); - while (count--) { - threads.push_back(boost::thread(attrs, boost::bind(&thread_group::data::run, this))); - } -} - -thread_group::data::~data() noexcept { - { - const boost::unique_lock<boost::mutex> lock(mutex); - stop = true; - } - has_work.notify_all(); - for (auto& worker : threads) { - try { - worker.join(); - } - catch(...) {} - } -} - -std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept { - std::unique_ptr<work> rc = std::move(head.ptr); - if (rc != nullptr) { - head.ptr = std::move(rc->next.ptr); - if (head.ptr == nullptr) { - last = std::addressof(head); - } - } - return rc; -} - -bool thread_group::data::try_run_one() noexcept { - /* This function and `run()` can both throw when acquiring the lock, or in - dispatched function. It is tough to recover from either, particularly the - lock case. These functions are marked as noexcept so that if either call - throws, the entire process is terminated. Users of the `dispatch` call are - expected to make their functions noexcept, or use std::packaged_task to copy - exceptions so that the process will continue in all but the most pessimistic - cases (std::bad_alloc). This was the existing behavior; - `asio::io_service::run` propogates errors from dispatched calls, and uncaught - exceptions on threads result in process termination. */ - std::unique_ptr<work> next = nullptr; - { - const boost::unique_lock<boost::mutex> lock(mutex); - next = get_next(); - } - if (next) { - assert(next->f); - next->f(); - return true; - } - return false; -} - -void thread_group::data::run() noexcept { - // see `try_run_one()` source for additional information - while (true) { - std::unique_ptr<work> next = nullptr; - { - boost::unique_lock<boost::mutex> lock(mutex); - has_work.wait(lock, [this] { return head.ptr != nullptr || stop; }); - if (stop) { - return; - } - next = get_next(); - } - assert(next != nullptr); - assert(next->f); - next->f(); - } -} - -void thread_group::data::dispatch(std::function<void()> f) { - std::unique_ptr<work> latest(new work{std::move(f), node{nullptr}}); - node* const latest_node = std::addressof(latest->next); - { - const boost::unique_lock<boost::mutex> lock(mutex); - assert(last != nullptr); - assert(last->ptr == nullptr); - - last->ptr = std::move(latest); - last = latest_node; - } - has_work.notify_one(); -} -} diff --git a/src/common/thread_group.h b/src/common/thread_group.h deleted file mode 100644 index 48fd4cd56..000000000 --- a/src/common/thread_group.h +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright (c) 2014-2017, The Monero Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#pragma once - -#include <boost/optional/optional.hpp> -#include <boost/thread/condition_variable.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/thread/thread.hpp> -#include <cstddef> -#include <functional> -#include <thread> -#include <utility> -#include <vector> - -namespace tools -{ -//! Manages zero or more threads for work dispatching. -class thread_group -{ -public: - - //! \return `get_max_concurrency() ? get_max_concurrency() - 1 : 0` - static std::size_t optimal(); - - //! \return `count ? min(count - 1, optimal()) : 0` - static std::size_t optimal_with_max(std::size_t count); - - //! Create an optimal number of threads. - explicit thread_group() : thread_group(optimal()) {} - - //! Create exactly `count` threads. - explicit thread_group(std::size_t count); - - thread_group(thread_group const&) = delete; - thread_group(thread_group&&) = delete; - - //! Joins threads, but does not necessarily run all dispatched functions. - ~thread_group() = default; - - thread_group& operator=(thread_group const&) = delete; - thread_group& operator=(thread_group&&) = delete; - - //! \return Number of threads owned by `this` group. - std::size_t count() const noexcept { - if (internal) { - return internal->count(); - } - return 0; - } - - //! \return True iff a function was available and executed (on `this_thread`). - bool try_run_one() noexcept { - if (internal) { - return internal->try_run_one(); - } - return false; - } - - /*! `f` is invoked immediately if `count() == 0`, otherwise execution of `f` - is queued for next available thread. If `f` is queued, any exception leaving - that function will result in process termination. Use std::packaged_task if - exceptions need to be handled. */ - template<typename F> - void dispatch(F&& f) { - if (internal) { - internal->dispatch(std::forward<F>(f)); - } - else { - f(); - } - } - -private: - class data { - public: - data(std::size_t count); - ~data() noexcept; - - std::size_t count() const noexcept { - return threads.size(); - } - - bool try_run_one() noexcept; - void dispatch(std::function<void()> f); - - private: - struct work; - - struct node { - std::unique_ptr<work> ptr; - }; - - struct work { - std::function<void()> f; - node next; - }; - - //! Requires lock on `mutex`. - std::unique_ptr<work> get_next() noexcept; - - //! Blocks until destructor is invoked, only call from thread. - void run() noexcept; - - private: - std::vector<boost::thread> threads; - node head; - node* last; - boost::condition_variable has_work; - boost::mutex mutex; - bool stop; - }; - -private: - // optionally construct elements, without separate heap allocation - boost::optional<data> internal; -}; - -} diff --git a/src/common/threadpool.cpp b/src/common/threadpool.cpp new file mode 100644 index 000000000..f7f9bbbaf --- /dev/null +++ b/src/common/threadpool.cpp @@ -0,0 +1,113 @@ +// Copyright (c) 2017, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include "common/threadpool.h" + +#include <cassert> +#include <limits> +#include <stdexcept> + +#include "cryptonote_config.h" +#include "common/util.h" + +namespace tools +{ +threadpool::threadpool() : running(true), active(0) { + boost::thread::attributes attrs; + attrs.set_stack_size(THREAD_STACK_SIZE); + max = tools::get_max_concurrency(); + size_t i = max; + while(i--) { + threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this))); + } +} + +threadpool::~threadpool() { + { + const boost::unique_lock<boost::mutex> lock(mutex); + running = false; + has_work.notify_all(); + } + for (size_t i = 0; i<threads.size(); i++) { + threads[i].join(); + } +} + +void threadpool::submit(waiter *obj, std::function<void()> f) { + entry e = {obj, f}; + boost::unique_lock<boost::mutex> lock(mutex); + if (active == max && !queue.empty()) { + // if all available threads are already running + // and there's work waiting, just run in current thread + lock.unlock(); + f(); + } else { + if (obj) + obj->inc(); + queue.push_back(e); + has_work.notify_one(); + } +} + +void threadpool::waiter::wait() { + boost::unique_lock<boost::mutex> lock(mt); + while(num) cv.wait(lock); +} + +void threadpool::waiter::inc() { + const boost::unique_lock<boost::mutex> lock(mt); + num++; +} + +void threadpool::waiter::dec() { + const boost::unique_lock<boost::mutex> lock(mt); + num--; + if (!num) + cv.notify_one(); +} + +void threadpool::run() { + boost::unique_lock<boost::mutex> lock(mutex); + while (running) { + entry e; + while(queue.empty() && running) + has_work.wait(lock); + if (!running) break; + + active++; + e = queue.front(); + queue.pop_front(); + lock.unlock(); + e.f(); + + if (e.wo) + e.wo->dec(); + lock.lock(); + active--; + } +} +} diff --git a/src/common/threadpool.h b/src/common/threadpool.h new file mode 100644 index 000000000..9455e9f66 --- /dev/null +++ b/src/common/threadpool.h @@ -0,0 +1,87 @@ +// Copyright (c) 2017, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> +#include <cstddef> +#include <functional> +#include <utility> +#include <vector> + +namespace tools +{ +//! A global thread pool +class threadpool +{ +public: + static threadpool& getInstance() { + static threadpool instance; + return instance; + } + + // The waiter lets the caller know when all of its + // tasks are completed. + class waiter { + boost::mutex mt; + boost::condition_variable cv; + int num; + public: + void inc(); + void dec(); + void wait(); //! Wait for a set of tasks to finish. + waiter() : num(0){} + ~waiter() { wait(); } + }; + + // Submit a task to the pool. The waiter pointer may be + // NULL if the caller doesn't care to wait for the + // task to finish. + void submit(waiter *waiter, std::function<void()> f); + + int get_max_concurrency() { return max; } + + private: + threadpool(); + ~threadpool(); + typedef struct entry { + waiter *wo; + std::function<void()> f; + } entry; + std::deque<entry> queue; + boost::condition_variable has_work; + boost::mutex mutex; + std::vector<boost::thread> threads; + int active; + int max; + bool running; + void run(); +}; + +} diff --git a/src/cryptonote_core/blockchain.cpp b/src/cryptonote_core/blockchain.cpp index 69d2edf65..7226a19b2 100644 --- a/src/cryptonote_core/blockchain.cpp +++ b/src/cryptonote_core/blockchain.cpp @@ -45,6 +45,7 @@ #include "profile_tools.h" #include "file_io_utils.h" #include "common/int-util.h" +#include "common/threadpool.h" #include "common/boost_serialization_helper.h" #include "warnings.h" #include "crypto/hash.h" @@ -2513,33 +2514,9 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, std::vector < uint64_t > results; results.resize(tx.vin.size(), 0); - int threads = tools::get_max_concurrency(); - - boost::asio::io_service ioservice; - boost::thread_group threadpool; - bool ioservice_active = false; - - std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); - if(threads > 1) - { - for (int i = 0; i < threads; i++) - { - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - } - ioservice_active = true; - } - -#define KILL_IOSERVICE() \ - if(ioservice_active) \ - { \ - work.reset(); \ - while (!ioservice.stopped()) ioservice.poll(); \ - threadpool.join_all(); \ - ioservice.stop(); \ - ioservice_active = false; \ - } - - epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); }); + tools::threadpool& tpool = tools::threadpool::getInstance(); + tools::threadpool::waiter waiter; + int threads = tpool.get_max_concurrency(); for (const auto& txin : tx.vin) { @@ -2600,7 +2577,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, { // ND: Speedup // 1. Thread ring signature verification if possible. - ioservice.dispatch(boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index]))); + tpool.submit(&waiter, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index]))); } else { @@ -2623,8 +2600,8 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, sig_index++; } - - KILL_IOSERVICE(); + if (tx.version == 1 && threads > 1) + waiter.wait(); if (tx.version == 1) { @@ -3699,7 +3676,8 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e return true; bool blocks_exist = false; - uint64_t threads = tools::get_max_concurrency(); + tools::threadpool& tpool = tools::threadpool::getInstance(); + uint64_t threads = tpool.get_max_concurrency(); if (blocks_entry.size() > 1 && threads > 1 && m_max_prepare_blocks_threads > 1) { @@ -3708,15 +3686,12 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e threads = m_max_prepare_blocks_threads; uint64_t height = m_db->height(); - std::vector<boost::thread *> thread_list; int batches = blocks_entry.size() / threads; int extra = blocks_entry.size() % threads; MDEBUG("block_batches: " << batches); std::vector<std::unordered_map<crypto::hash, crypto::hash>> maps(threads); std::vector < std::vector < block >> blocks(threads); auto it = blocks_entry.begin(); - boost::thread::attributes attrs; - attrs.set_stack_size(THREAD_STACK_SIZE); for (uint64_t i = 0; i < threads; i++) { @@ -3775,19 +3750,14 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e { m_blocks_longhash_table.clear(); uint64_t thread_height = height; + tools::threadpool::waiter waiter; for (uint64_t i = 0; i < threads; i++) { - thread_list.push_back(new boost::thread(attrs, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i])))); + tpool.submit(&waiter, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i]))); thread_height += blocks[i].size(); } - for (size_t j = 0; j < thread_list.size(); j++) - { - thread_list[j]->join(); - delete thread_list[j]; - } - - thread_list.clear(); + waiter.wait(); if (m_cancel) return false; @@ -3911,30 +3881,20 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e // [output] stores all transactions for each tx_out_index::hash found std::vector<std::unordered_map<crypto::hash, cryptonote::transaction>> transactions(amounts.size()); - threads = tools::get_max_concurrency(); + threads = tpool.get_max_concurrency(); if (!m_db->can_thread_bulk_indices()) threads = 1; if (threads > 1) { - boost::asio::io_service ioservice; - boost::thread_group threadpool; - std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); - - for (uint64_t i = 0; i < threads; i++) - { - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - } + tools::threadpool::waiter waiter; for (size_t i = 0; i < amounts.size(); i++) { uint64_t amount = amounts[i]; - ioservice.dispatch(boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i]))); + tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i]))); } - - work.reset(); - threadpool.join_all(); - ioservice.stop(); + waiter.wait(); } else { diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index c3aeb15f0..903e9661c 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -37,7 +37,7 @@ using namespace epee; #include "common/util.h" #include "common/updates.h" #include "common/download.h" -#include "common/task_region.h" +#include "common/threadpool.h" #include "warnings.h" #include "crypto/crypto.h" #include "cryptonote_config.h" @@ -74,7 +74,7 @@ namespace cryptonote m_last_dns_checkpoints_update(0), m_last_json_checkpoints_update(0), m_disable_dns_checkpoints(false), - m_threadpool(tools::thread_group::optimal()), + m_threadpool(tools::threadpool::getInstance()), m_update_download(0) { m_checkpoints_updating.clear(); @@ -591,54 +591,53 @@ namespace cryptonote std::vector<result> results(tx_blobs.size()); tvc.resize(tx_blobs.size()); - tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) { - std::list<blobdata>::const_iterator it = tx_blobs.begin(); - for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { - region.run([&, i, it] { + tools::threadpool::waiter waiter; + std::list<blobdata>::const_iterator it = tx_blobs.begin(); + for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { + m_threadpool.submit(&waiter, [&, i, it] { + try + { + results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); + } + catch (const std::exception &e) + { + MERROR_VER("Exception in handle_incoming_tx_pre: " << e.what()); + results[i].res = false; + } + }); + } + waiter.wait(); + it = tx_blobs.begin(); + for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { + if (!results[i].res) + continue; + if(m_mempool.have_tx(results[i].hash)) + { + LOG_PRINT_L2("tx " << results[i].hash << "already have transaction in tx_pool"); + } + else if(m_blockchain_storage.have_tx(results[i].hash)) + { + LOG_PRINT_L2("tx " << results[i].hash << " already have transaction in blockchain"); + } + else + { + m_threadpool.submit(&waiter, [&, i, it] { try { - results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); + results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); } catch (const std::exception &e) { - MERROR_VER("Exception in handle_incoming_tx_pre: " << e.what()); + MERROR_VER("Exception in handle_incoming_tx_post: " << e.what()); results[i].res = false; } }); } - }); - tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) { - std::list<blobdata>::const_iterator it = tx_blobs.begin(); - for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { - if (!results[i].res) - continue; - if(m_mempool.have_tx(results[i].hash)) - { - LOG_PRINT_L2("tx " << results[i].hash << "already have transaction in tx_pool"); - } - else if(m_blockchain_storage.have_tx(results[i].hash)) - { - LOG_PRINT_L2("tx " << results[i].hash << " already have transaction in blockchain"); - } - else - { - region.run([&, i, it] { - try - { - results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); - } - catch (const std::exception &e) - { - MERROR_VER("Exception in handle_incoming_tx_post: " << e.what()); - results[i].res = false; - } - }); - } - } - }); + } + waiter.wait(); bool ok = true; - std::list<blobdata>::const_iterator it = tx_blobs.begin(); + it = tx_blobs.begin(); for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { if (!results[i].res) { diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index 8e17569a9..0a64b4b4a 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -40,7 +40,7 @@ #include "cryptonote_protocol/cryptonote_protocol_handler_common.h" #include "storages/portable_storage_template_helper.h" #include "common/download.h" -#include "common/thread_group.h" +#include "common/threadpool.h" #include "tx_pool.h" #include "blockchain.h" #include "cryptonote_basic/miner.h" @@ -940,7 +940,7 @@ namespace cryptonote std::unordered_set<crypto::hash> bad_semantics_txes[2]; boost::mutex bad_semantics_txes_lock; - tools::thread_group m_threadpool; + tools::threadpool& m_threadpool; enum { UPDATES_DISABLED, diff --git a/src/ringct/rctSigs.cpp b/src/ringct/rctSigs.cpp index 8efd6a07c..946325367 100644 --- a/src/ringct/rctSigs.cpp +++ b/src/ringct/rctSigs.cpp @@ -30,8 +30,7 @@ #include "misc_log_ex.h" #include "common/perf_timer.h" -#include "common/task_region.h" -#include "common/thread_group.h" +#include "common/threadpool.h" #include "common/util.h" #include "rctSigs.h" #include "cryptonote_basic/cryptonote_format_utils.h" @@ -731,17 +730,16 @@ namespace rct { try { if (semantics) { + tools::threadpool& tpool = tools::threadpool::getInstance(); + tools::threadpool::waiter waiter; std::deque<bool> results(rv.outPk.size(), false); - tools::thread_group threadpool(tools::thread_group::optimal_with_max(rv.outPk.size())); - - tools::task_region(threadpool, [&] (tools::task_region_handle& region) { - DP("range proofs verified?"); - for (size_t i = 0; i < rv.outPk.size(); i++) { - region.run([&, i] { - results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); - }); - } - }); + DP("range proofs verified?"); + for (size_t i = 0; i < rv.outPk.size(); i++) { + tpool.submit(&waiter, [&, i] { + results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); + }); + } + waiter.wait(); for (size_t i = 0; i < rv.outPk.size(); ++i) { if (!results[i]) { @@ -794,7 +792,8 @@ namespace rct { const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size()); std::deque<bool> results(threads); - tools::thread_group threadpool(tools::thread_group::optimal_with_max(threads)); + tools::threadpool& tpool = tools::threadpool::getInstance(); + tools::threadpool::waiter waiter; if (semantics) { key sumOutpks = identity(); @@ -819,13 +818,12 @@ namespace rct { results.clear(); results.resize(rv.outPk.size()); - tools::task_region(threadpool, [&] (tools::task_region_handle& region) { - for (size_t i = 0; i < rv.outPk.size(); i++) { - region.run([&, i] { - results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); - }); - } - }); + for (size_t i = 0; i < rv.outPk.size(); i++) { + tpool.submit(&waiter, [&, i] { + results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); + }); + } + waiter.wait(); for (size_t i = 0; i < results.size(); ++i) { if (!results[i]) { @@ -839,13 +837,12 @@ namespace rct { results.clear(); results.resize(rv.mixRing.size()); - tools::task_region(threadpool, [&] (tools::task_region_handle& region) { - for (size_t i = 0 ; i < rv.mixRing.size() ; i++) { - region.run([&, i] { + for (size_t i = 0 ; i < rv.mixRing.size() ; i++) { + tpool.submit(&waiter, [&, i] { results[i] = verRctMGSimple(message, rv.p.MGs[i], rv.mixRing[i], rv.pseudoOuts[i]); - }); - } - }); + }); + } + waiter.wait(); for (size_t i = 0; i < results.size(); ++i) { if (!results[i]) { diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index 323a3a7fe..f72d281c7 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -45,6 +45,7 @@ using namespace epee; #include "cryptonote_basic/cryptonote_basic_impl.h" #include "common/boost_serialization_helper.h" #include "common/command_line.h" +#include "common/threadpool.h" #include "profile_tools.h" #include "crypto/crypto.h" #include "serialization/binary_utils.h" @@ -89,14 +90,6 @@ using namespace cryptonote; #define SECOND_OUTPUT_RELATEDNESS_THRESHOLD 0.0f -#define KILL_IOSERVICE() \ - do { \ - work.reset(); \ - while (!ioservice.stopped()) ioservice.poll(); \ - threadpool.join_all(); \ - ioservice.stop(); \ - } while(0) - #define KEY_IMAGE_EXPORT_FILE_MAGIC "Monero key image export\002" namespace @@ -684,7 +677,9 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote std::deque<crypto::key_image> ki(tx.vout.size()); std::deque<uint64_t> amount(tx.vout.size()); std::deque<rct::key> mask(tx.vout.size()); - int threads = tools::get_max_concurrency(); + tools::threadpool& tpool = tools::threadpool::getInstance(); + tools::threadpool::waiter waiter; + int threads = tpool.get_max_concurrency(); const cryptonote::account_keys& keys = m_account.get_keys(); crypto::key_derivation derivation; generate_key_derivation(tx_pub_key, keys.m_view_secret_key, derivation); @@ -720,13 +715,6 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote ++num_vouts_received; // process the other outs from that tx - boost::asio::io_service ioservice; - boost::thread_group threadpool; - std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); - for (int i = 0; i < threads; i++) - { - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - } std::vector<uint64_t> money_transfered(tx.vout.size()); std::deque<bool> error(tx.vout.size()); @@ -734,10 +722,10 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote // the first one was already checked for (size_t i = 1; i < tx.vout.size(); ++i) { - ioservice.dispatch(boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i, + tpool.submit(&waiter, boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i, std::ref(received[i]), std::ref(money_transfered[i]), std::ref(error[i]))); } - KILL_IOSERVICE(); + waiter.wait(); for (size_t i = 1; i < tx.vout.size(); ++i) { if (error[i]) @@ -766,23 +754,18 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote } else if (tx.vout.size() > 1 && threads > 1) { - boost::asio::io_service ioservice; - boost::thread_group threadpool; - std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); - for (int i = 0; i < threads; i++) - { - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - } + tools::threadpool& tpool = tools::threadpool::getInstance(); + tools::threadpool::waiter waiter; std::vector<uint64_t> money_transfered(tx.vout.size()); std::deque<bool> error(tx.vout.size()); std::deque<bool> received(tx.vout.size()); for (size_t i = 0; i < tx.vout.size(); ++i) { - ioservice.dispatch(boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i, + tpool.submit(&waiter, boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i, std::ref(received[i]), std::ref(money_transfered[i]), std::ref(error[i]))); } - KILL_IOSERVICE(); + waiter.wait(); tx_money_got_in_outs = 0; for (size_t i = 0; i < tx.vout.size(); ++i) { @@ -1251,7 +1234,8 @@ void wallet2::process_blocks(uint64_t start_height, const std::list<cryptonote:: THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "size mismatch"); - int threads = tools::get_max_concurrency(); + tools::threadpool& tpool = tools::threadpool::getInstance(); + int threads = tpool.get_max_concurrency(); if (threads > 1) { std::vector<crypto::hash> round_block_hashes(threads); @@ -1262,23 +1246,16 @@ void wallet2::process_blocks(uint64_t start_height, const std::list<cryptonote:: for (size_t b = 0; b < blocks_size; b += threads) { size_t round_size = std::min((size_t)threads, blocks_size - b); - - boost::asio::io_service ioservice; - boost::thread_group threadpool; - std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice)); - for (size_t i = 0; i < round_size; i++) - { - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - } + tools::threadpool::waiter waiter; std::list<block_complete_entry>::const_iterator tmpblocki = blocki; for (size_t i = 0; i < round_size; ++i) { - ioservice.dispatch(boost::bind(&wallet2::parse_block_round, this, std::cref(tmpblocki->block), + tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(tmpblocki->block), std::ref(round_blocks[i]), std::ref(round_block_hashes[i]), std::ref(error[i]))); ++tmpblocki; } - KILL_IOSERVICE(); + waiter.wait(); tmpblocki = blocki; for (size_t i = 0; i < round_size; ++i) { @@ -1698,7 +1675,8 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re size_t try_count = 0; crypto::hash last_tx_hash_id = m_transfers.size() ? m_transfers.back().m_txid : null_hash; std::list<crypto::hash> short_chain_history; - boost::thread pull_thread; + tools::threadpool& tpool = tools::threadpool::getInstance(); + tools::threadpool::waiter waiter; uint64_t blocks_start_height; std::list<cryptonote::block_complete_entry> blocks; std::vector<COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> o_indices; @@ -1736,11 +1714,11 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re std::list<cryptonote::block_complete_entry> next_blocks; std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> next_o_indices; bool error = false; - pull_thread = boost::thread([&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_o_indices, error);}); + tpool.submit(&waiter, [&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_o_indices, error);}); process_blocks(blocks_start_height, blocks, o_indices, added_blocks); blocks_fetched += added_blocks; - pull_thread.join(); + waiter.wait(); if(blocks_start_height == next_blocks_start_height) { m_node_rpc_proxy.set_height(m_blockchain.size()); @@ -1762,8 +1740,7 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re catch (const std::exception&) { blocks_fetched += added_blocks; - if (pull_thread.joinable()) - pull_thread.join(); + waiter.wait(); if(try_count < 3) { LOG_PRINT_L1("Another try pull_blocks (try_count=" << try_count << ")..."); diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt index 2f62dc2aa..f5e08b102 100644 --- a/tests/unit_tests/CMakeLists.txt +++ b/tests/unit_tests/CMakeLists.txt @@ -55,7 +55,6 @@ set(unit_tests_sources test_tx_utils.cpp test_peerlist.cpp test_protocol_pack.cpp - thread_group.cpp hardfork.cpp unbound.cpp uri.cpp diff --git a/tests/unit_tests/thread_group.cpp b/tests/unit_tests/thread_group.cpp deleted file mode 100644 index c0ba3d38b..000000000 --- a/tests/unit_tests/thread_group.cpp +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright (c) 2014-2017, The Monero Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#include "gtest/gtest.h" - -#include <atomic> -#include "common/task_region.h" -#include "common/thread_group.h" - -TEST(ThreadGroup, NoThreads) -{ - tools::task_region(tools::thread_group(0), [] (tools::task_region_handle& region) { - std::atomic<bool> completed{false}; - region.run([&] { completed = true; }); - EXPECT_TRUE(completed); - }); - { - tools::thread_group group(0); - std::atomic<bool> completed{false}; - group.dispatch([&] { completed = true; }); - EXPECT_TRUE(completed); - } -} - -TEST(ThreadGroup, OneThread) -{ - tools::thread_group group(1); - - for (unsigned i = 0; i < 3; ++i) { - std::atomic<bool> completed{false}; - tools::task_region(group, [&] (tools::task_region_handle& region) { - region.run([&] { completed = true; }); - }); - EXPECT_TRUE(completed); - } -} - - -TEST(ThreadGroup, UseActiveThreadOnSync) -{ - tools::thread_group group(1); - - for (unsigned i = 0; i < 3; ++i) { - std::atomic<bool> completed{false}; - tools::task_region(group, [&] (tools::task_region_handle& region) { - region.run([&] { while (!completed); }); - region.run([&] { completed = true; }); - }); - EXPECT_TRUE(completed); - } -} - -TEST(ThreadGroup, InOrder) -{ - tools::thread_group group(1); - - for (unsigned i = 0; i < 3; ++i) { - std::atomic<unsigned> count{0}; - std::atomic<bool> completed{false}; - tools::task_region(group, [&] (tools::task_region_handle& region) { - region.run([&] { while (!completed); }); - region.run([&] { if (count == 0) completed = true; }); - region.run([&] { ++count; }); - }); - EXPECT_TRUE(completed); - EXPECT_EQ(1u, count); - } -} - -TEST(ThreadGroup, TwoThreads) -{ - tools::thread_group group(2); - - for (unsigned i = 0; i < 3; ++i) { - std::atomic<bool> completed{false}; - tools::task_region(group, [&] (tools::task_region_handle& region) { - region.run([&] { while (!completed); }); - region.run([&] { while (!completed); }); - region.run([&] { completed = true; }); - }); - EXPECT_TRUE(completed); - } -} - -TEST(ThreadGroup, Nested) { - struct fib { - unsigned operator()(tools::thread_group& group, unsigned value) const { - if (value == 0 || value == 1) { - return value; - } - unsigned left = 0; - unsigned right = 0; - tools::task_region(group, [&, value] (tools::task_region_handle& region) { - region.run([&, value] { left = fib{}(group, value - 1); }); - region.run([&, value] { right = fib{}(group, value - 2); } ); - }); - return left + right; - } - - unsigned operator()(tools::thread_group&& group, unsigned value) const { - return (*this)(group, value); - } - }; - // be careful of depth on asynchronous version - EXPECT_EQ(6765, fib{}(tools::thread_group(0), 20)); - EXPECT_EQ(377, fib{}(tools::thread_group(1), 14)); -} - -TEST(ThreadGroup, Many) -{ - tools::thread_group group(1); - - for (unsigned i = 0; i < 3; ++i) { - std::atomic<unsigned> count{0}; - tools::task_region(group, [&] (tools::task_region_handle& region) { - for (unsigned tasks = 0; tasks < 1000; ++tasks) { - region.run([&] { ++count; }); - } - }); - EXPECT_EQ(1000u, count); - } -} - -TEST(ThreadGroup, ThrowInTaskRegion) -{ - class test_exception final : std::exception { - public: - explicit test_exception() : std::exception() {} - - virtual const char* what() const noexcept override { - return "test_exception"; - } - }; - - tools::thread_group group(1); - - for (unsigned i = 0; i < 3; ++i) { - std::atomic<unsigned> count{0}; - EXPECT_THROW( - [&] { - tools::task_region(group, [&] (tools::task_region_handle& region) { - for (unsigned tasks = 0; tasks < 1000; ++tasks) { - region.run([&] { ++count; }); - } - throw test_exception(); - }); - }(), - test_exception - ); - EXPECT_GE(1000u, count); - } -} |