diff options
author | Lee Clagett <code@leeclagett.com> | 2016-11-21 14:48:42 -0500 |
---|---|---|
committer | Lee Clagett <code@leeclagett.com> | 2016-11-23 14:41:25 -0500 |
commit | f025198f195516da8654bece64bf7a1fb85be3cd (patch) | |
tree | ef760c61553d5a51d09bcfbb4b02840169ab5c13 /src/common | |
parent | Merge pull request #1346 (diff) | |
download | monero-f025198f195516da8654bece64bf7a1fb85be3cd.tar.xz |
Added task_region - a fork/join task implementation
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/common/task_region.cpp | 94 | ||||
-rw-r--r-- | src/common/task_region.h | 223 | ||||
-rw-r--r-- | src/common/thread_group.cpp | 86 | ||||
-rw-r--r-- | src/common/thread_group.h | 66 |
5 files changed, 394 insertions, 77 deletions
diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index d5d22bca6..dd17f6d64 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -33,6 +33,7 @@ set(common_sources util.cpp i18n.cpp perf_timer.cpp + task_region.cpp thread_group.cpp) if (STACK_TRACE) @@ -57,6 +58,7 @@ set(common_private_headers i18n.h perf_timer.h stack_trace.h + task_region.h thread_group.h) monero_private_headers(common diff --git a/src/common/task_region.cpp b/src/common/task_region.cpp new file mode 100644 index 000000000..b53a8376a --- /dev/null +++ b/src/common/task_region.cpp @@ -0,0 +1,94 @@ +// Copyright (c) 2014-2016, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include "common/task_region.h" + +#include <boost/thread/locks.hpp> +#include <cassert> + +/* `mark_completed` and `wait` can throw in the lock call, but its difficult to +recover from either. An exception in `wait` means the post condition of joining +all threads cannot be achieved, and an exception in `mark_completed` means +certain deadlock. `noexcept` qualifier will force a call to `std::terminate` if +locking throws an exception, which should only happen if a recursive lock +attempt is made (which is not possible since no external function is called +while holding the lock). */ + +namespace tools +{ +void task_region_handle::state::mark_completed(id task_id) noexcept { + assert(task_id != 0 && (task_id & (task_id - 1)) == 0); // power of 2 check + if (pending.fetch_and(~task_id) == task_id) { + // synchronize with wait call, but do not need to hold + boost::unique_lock<boost::mutex>{sync_on_complete}; + all_complete.notify_all(); + } +} + +void task_region_handle::state::abort() noexcept { + state* current = this; + while (current) { + current->ready = 0; + current = current->next.get(); + } +} + +void task_region_handle::state::wait() noexcept { + state* current = this; + while (current) { + { + boost::unique_lock<boost::mutex> lock{current->sync_on_complete}; + current->all_complete.wait(lock, [current] { return current->pending == 0; }); + } + current = current->next.get(); + } +} + +void task_region_handle::state::wait(thread_group& threads) noexcept { + state* current = this; + while (current) { + while (current->pending != 0) { + if (!threads.try_run_one()) { + current->wait(); + return; + } + } + current = current->next.get(); + } +} + +void task_region_handle::create_state() { + st = std::make_shared<state>(std::move(st)); + next_id = 1; +} + +void task_region_handle::do_wait() noexcept { + assert(st); + const std::shared_ptr<state> temp = std::move(st); + temp->wait(threads); +} +} diff --git a/src/common/task_region.h b/src/common/task_region.h new file mode 100644 index 000000000..e4d210661 --- /dev/null +++ b/src/common/task_region.h @@ -0,0 +1,223 @@ +// Copyright (c) 2014-2016, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include <atomic> +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> +#include <memory> +#include <type_traits> +#include <utility> + +#include "common/thread_group.h" + +namespace tools +{ + +/*! A model of the fork-join concept. `run(...)` "forks" (i.e. spawns new +tasks), and `~task_region_handle()` or `wait()` "joins" the spawned tasks. +`wait` will block until all tasks have completed, while `~task_region_handle()` +blocks until all tasks have completed or aborted. + +Do _NOT_ give this object to separate thread of execution (which includes +`task_region_handle::run(...)`) because joining on a different thread is +undesireable (potential deadlock). + +This class cannot be constructed directly, use the function +`task_region(...)` instead. +*/ +class task_region_handle +{ + struct state + { + using id = unsigned; + + explicit state(std::shared_ptr<state> next_src) noexcept + : next(std::move(next_src)) + , ready(0) + , pending(0) + , sync_on_complete() + , all_complete() { + } + + state(const state&) = default; + state(state&&) = default; + ~state() = default; + state& operator=(const state&) = default; + state& operator=(state&&) = default; + + void track_id(id task_id) noexcept { + pending |= task_id; + ready |= task_id; + } + + //! \return True only once whether a given id can execute + bool can_run(id task_id) noexcept { + return (ready.fetch_and(~task_id) & task_id); + } + + //! Mark id as completed, and synchronize with waiting threads + void mark_completed(id task_id) noexcept; + + //! Tell all unstarted functions in region to return immediately + void abort() noexcept; + + //! Blocks until all functions in region have aborted or completed. + void wait() noexcept; + + //! Same as `wait()`, except `this_thread` runs tasks while waiting. + void wait(thread_group& threads) noexcept; + + private: + /* This implementation is a bit pessimistic, it ensures that all copies + of a wrapped task can only be executed once. `thread_group` should never + do this, but some variable needs to track whether an abort should be done + anyway... */ + std::shared_ptr<state> next; + std::atomic<id> ready; //!< Tracks whether a task has been invoked + std::atomic<id> pending; //!< Tracks when a task has completed or aborted + boost::mutex sync_on_complete; + boost::condition_variable all_complete; + }; + + template<typename F> + struct wrapper + { + wrapper(state::id id_src, std::shared_ptr<state> st_src, F f_src) + : task_id(id_src), st(std::move(st_src)), f(std::move(f_src)) { + } + + wrapper(const wrapper&) = default; + wrapper(wrapper&&) = default; + wrapper& operator=(const wrapper&) = default; + wrapper& operator=(wrapper&&) = default; + + void operator()() { + if (st) { + if (st->can_run(task_id)) { + f(); + } + st->mark_completed(task_id); + } + } + + private: + const state::id task_id; + std::shared_ptr<state> st; + F f; + }; + +public: + friend struct task_region_; + + task_region_handle() = delete; + task_region_handle(const task_region_handle&) = delete; + task_region_handle(task_region_handle&&) = delete; + + //! Cancels unstarted pending tasks, and waits for them to respond. + ~task_region_handle() noexcept { + if (st) { + st->abort(); + st->wait(threads); + } + } + + task_region_handle& operator=(const task_region_handle&) = delete; + task_region_handle& operator=(task_region_handle&&) = delete; + + /*! If the group has no threads, `f` is immediately run before returning. + Otherwise, `f` is dispatched to the thread_group associated with `this` + region. If `f` is dispatched to another thread, and it throws, the process + will immediately terminate. See std::packaged_task for getting exceptions on + functions executed on other threads. */ + template<typename F> + void run(F&& f) { + if (threads.count() == 0) { + f(); + } else { + if (!st || next_id == 0) { + create_state(); + } + const state::id this_id = next_id; + next_id <<= 1; + + st->track_id(this_id); + threads.dispatch(wrapper<F>{this_id, st, std::move(f)}); + } + } + + //! Wait until all functions provided to `run` have completed. + void wait() noexcept { + if (st) { + do_wait(); + } + } + +private: + explicit task_region_handle(thread_group& threads_src) + : st(nullptr), threads(threads_src), next_id(0) { + } + + void create_state(); + void do_wait() noexcept; + + std::shared_ptr<state> st; + thread_group& threads; + state::id next_id; +}; + +/*! Function for creating a `task_region_handle`, which automatically calls +`task_region_handle::wait()` before returning. If a `thread_group` is not +provided, one is created with an optimal number of threads. The callback `f` +must have the signature `void(task_region_handle&)`. */ +struct task_region_ { + template<typename F> + void operator()(thread_group& threads, F&& f) const { + static_assert( + std::is_same<void, typename std::result_of<F(task_region_handle&)>::type>::value, + "f cannot have a return value" + ); + task_region_handle region{threads}; + f(region); + region.wait(); + } + + template<typename F> + void operator()(thread_group&& threads, F&& f) const { + (*this)(threads, std::forward<F>(f)); + } + + template<typename F> + void operator()(F&& f) const { + thread_group threads; + (*this)(threads, std::forward<F>(f)); + } +}; + +constexpr const task_region_ task_region{}; +} diff --git a/src/common/thread_group.cpp b/src/common/thread_group.cpp index aa1b64f2e..4e1cc8964 100644 --- a/src/common/thread_group.cpp +++ b/src/common/thread_group.cpp @@ -27,6 +27,7 @@ // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "common/thread_group.h" +#include <boost/thread/locks.hpp> #include <cassert> #include <limits> #include <stdexcept> @@ -35,14 +36,20 @@ namespace tools { -thread_group::thread_group(std::size_t count) : internal() { +std::size_t thread_group::optimal() { static_assert( std::numeric_limits<unsigned>::max() <= std::numeric_limits<std::size_t>::max(), "unexpected truncation" ); - count = std::min<std::size_t>(count, get_max_concurrency()); - count = count ? count - 1 : 0; + const std::size_t hardware = get_max_concurrency(); + return hardware ? (hardware - 1) : 0; +} + +std::size_t thread_group::optimal_with_max(std::size_t count) { + return count ? std::min(count - 1, optimal()) : 0; +} +thread_group::thread_group(std::size_t count) : internal() { if (count) { internal.emplace(count); } @@ -52,24 +59,21 @@ thread_group::data::data(std::size_t count) : threads() , head{nullptr} , last(std::addressof(head)) - , pending(count) , mutex() , has_work() - , finished_work() , stop(false) { threads.reserve(count); while (count--) { - threads.push_back(std::thread(&thread_group::data::run, this)); + threads.push_back(boost::thread(&thread_group::data::run, this)); } } thread_group::data::~data() noexcept { { - const std::unique_lock<std::mutex> lock(mutex); + const boost::unique_lock<boost::mutex> lock(mutex); stop = true; } has_work.notify_all(); - finished_work.notify_all(); for (auto& worker : threads) { try { worker.join(); @@ -78,10 +82,20 @@ thread_group::data::~data() noexcept { } } +std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept { + std::unique_ptr<work> rc = std::move(head.ptr); + if (rc != nullptr) { + head.ptr = std::move(rc->next.ptr); + if (head.ptr == nullptr) { + last = std::addressof(head); + } + } + return rc; +} -void thread_group::data::sync() noexcept { +bool thread_group::data::try_run_one() noexcept { /* This function and `run()` can both throw when acquiring the lock, or in - the dispatched function. It is tough to recover from either, particularly the + dispatched function. It is tough to recover from either, particularly the lock case. These functions are marked as noexcept so that if either call throws, the entire process is terminated. Users of the `dispatch` call are expected to make their functions noexcept, or use std::packaged_task to copy @@ -89,50 +103,25 @@ void thread_group::data::sync() noexcept { cases (std::bad_alloc). This was the existing behavior; `asio::io_service::run` propogates errors from dispatched calls, and uncaught exceptions on threads result in process termination. */ - assert(!threads.empty()); - bool not_first = false; - while (true) { - std::unique_ptr<work> next = nullptr; - { - std::unique_lock<std::mutex> lock(mutex); - pending -= std::size_t(not_first); - not_first = true; - finished_work.notify_all(); - - if (stop) { - return; - } - - next = get_next(); - if (next == nullptr) { - finished_work.wait(lock, [this] { return pending == 0 || stop; }); - return; - } - } + std::unique_ptr<work> next = nullptr; + { + const boost::unique_lock<boost::mutex> lock(mutex); + next = get_next(); + } + if (next) { assert(next->f); next->f(); + return true; } -} - -std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept { - std::unique_ptr<work> rc = std::move(head.ptr); - if (rc != nullptr) { - head.ptr = std::move(rc->next.ptr); - if (head.ptr == nullptr) { - last = std::addressof(head); - } - } - return rc; + return false; } void thread_group::data::run() noexcept { - // see `sync()` source for additional information + // see `try_run_one()` source for additional information while (true) { std::unique_ptr<work> next = nullptr; { - std::unique_lock<std::mutex> lock(mutex); - --pending; - finished_work.notify_all(); + boost::unique_lock<boost::mutex> lock(mutex); has_work.wait(lock, [this] { return head.ptr != nullptr || stop; }); if (stop) { return; @@ -149,15 +138,12 @@ void thread_group::data::dispatch(std::function<void()> f) { std::unique_ptr<work> latest(new work{std::move(f), node{nullptr}}); node* const latest_node = std::addressof(latest->next); { - const std::unique_lock<std::mutex> lock(mutex); + const boost::unique_lock<boost::mutex> lock(mutex); assert(last != nullptr); assert(last->ptr == nullptr); - if (pending == std::numeric_limits<std::size_t>::max()) { - throw std::overflow_error("thread_group exceeded max queue depth"); - } + last->ptr = std::move(latest); last = latest_node; - ++pending; } has_work.notify_one(); } diff --git a/src/common/thread_group.h b/src/common/thread_group.h index d8461d49a..62e82d832 100644 --- a/src/common/thread_group.h +++ b/src/common/thread_group.h @@ -25,8 +25,12 @@ // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + #include <boost/optional/optional.hpp> -#include <condition_variable> +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> #include <cstddef> #include <functional> #include <thread> @@ -35,11 +39,21 @@ namespace tools { -//! Manages zero or more threads for work dispatching +//! Manages zero or more threads for work dispatching. class thread_group { public: - //! Create `min(count, get_max_concurrency()) - 1` threads + + //! \return `get_max_concurrency() ? get_max_concurrency() - 1 : 0` + static std::size_t optimal(); + + //! \return `count ? min(count - 1, optimal()) : 0` + static std::size_t optimal_with_max(std::size_t count); + + //! Create an optimal number of threads. + explicit thread_group() : thread_group(optimal()) {} + + //! Create exactly `count` threads. explicit thread_group(std::size_t count); thread_group(thread_group const&) = delete; @@ -51,30 +65,26 @@ public: thread_group& operator=(thread_group const&) = delete; thread_group& operator=(thread_group&&) = delete; - /*! Blocks until all functions provided to `dispatch` complete. Does not - destroy threads. If a dispatched function calls `this->dispatch(...)`, - `this->sync()` will continue to block until that new function completes. */ - void sync() noexcept { + //! \return Number of threads owned by `this` group. + std::size_t count() const noexcept { if (internal) { - internal->sync(); + return internal->count(); } + return 0; } - /*! Example usage: - std::unique_ptr<thread_group, thread_group::lazy_sync> sync(std::addressof(group)); - which guarantees synchronization before the unique_ptr destructor returns. */ - struct lazy_sync { - void operator()(thread_group* group) const noexcept { - if (group != nullptr) { - group->sync(); - } + //! \return True iff a function was available and executed (on `this_thread`). + bool try_run_one() noexcept { + if (internal) { + return internal->try_run_one(); } - }; + return false; + } - /*! `f` is invoked immediately if the thread_group is empty, otherwise - execution of `f` is queued for next available thread. If `f` is queued, any - exception leaving that function will result in process termination. Use - std::packaged_task if exceptions need to be handled. */ + /*! `f` is invoked immediately if `count() == 0`, otherwise execution of `f` + is queued for next available thread. If `f` is queued, any exception leaving + that function will result in process termination. Use std::packaged_task if + exceptions need to be handled. */ template<typename F> void dispatch(F&& f) { if (internal) { @@ -91,8 +101,11 @@ private: data(std::size_t count); ~data() noexcept; - void sync() noexcept; + std::size_t count() const noexcept { + return threads.size(); + } + bool try_run_one() noexcept; void dispatch(std::function<void()> f); private: @@ -116,13 +129,11 @@ private: void run() noexcept; private: - std::vector<std::thread> threads; + std::vector<boost::thread> threads; node head; node* last; - std::size_t pending; - std::condition_variable has_work; - std::condition_variable finished_work; - std::mutex mutex; + boost::condition_variable has_work; + boost::mutex mutex; bool stop; }; @@ -130,4 +141,5 @@ private: // optionally construct elements, without separate heap allocation boost::optional<data> internal; }; + } |