diff options
author | Riccardo Spagni <ric@spagni.net> | 2016-11-08 22:37:43 +0200 |
---|---|---|
committer | Riccardo Spagni <ric@spagni.net> | 2016-11-08 22:37:43 +0200 |
commit | dce47d52af614b339c1068fe76d815cb93faca30 (patch) | |
tree | 497a2476daca11e9a4b4532740a73b218e74ea0d /src/common | |
parent | Merge pull request #1287 (diff) | |
parent | adding thread_group for managing async tasks (diff) | |
download | monero-dce47d52af614b339c1068fe76d815cb93faca30.tar.xz |
Merge pull request #1291
64094e5 adding thread_group for managing async tasks (Lee Clagett)
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/CMakeLists.txt | 6 | ||||
-rw-r--r-- | src/common/thread_group.cpp | 164 | ||||
-rw-r--r-- | src/common/thread_group.h | 133 |
3 files changed, 301 insertions, 2 deletions
diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 4b6149cbd..d5d22bca6 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -32,7 +32,8 @@ set(common_sources dns_utils.cpp util.cpp i18n.cpp - perf_timer.cpp) + perf_timer.cpp + thread_group.cpp) if (STACK_TRACE) list(APPEND common_sources stack_trace.cpp) @@ -55,7 +56,8 @@ set(common_private_headers varint.h i18n.h perf_timer.h - stack_trace.h) + stack_trace.h + thread_group.h) monero_private_headers(common ${common_private_headers}) diff --git a/src/common/thread_group.cpp b/src/common/thread_group.cpp new file mode 100644 index 000000000..ece268ab7 --- /dev/null +++ b/src/common/thread_group.cpp @@ -0,0 +1,164 @@ +// Copyright (c) 2014-2016, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include "common/thread_group.h" + +#include <cassert> +#include <limits> +#include <stdexcept> + +#include "common/util.h" + +namespace tools +{ +thread_group::thread_group(std::size_t count) : internal() { + static_assert( + std::numeric_limits<unsigned>::max() <= std::numeric_limits<std::size_t>::max(), + "unexpected truncation" + ); + count = std::min<std::size_t>(count, get_max_concurrency()); + count = count ? count - 1 : 0; + + if (count) { + internal.emplace(count); + } +} + +thread_group::data::data(std::size_t count) + : threads() + , head{nullptr} + , last(std::addressof(head)) + , pending(count) + , mutex() + , has_work() + , finished_work() + , stop(false) { + threads.reserve(count); + while (count--) { + threads.push_back(std::thread(&thread_group::data::run, this)); + } +} + +thread_group::data::~data() noexcept { + { + const std::unique_lock<std::mutex> lock(mutex); + stop = true; + } + has_work.notify_all(); + finished_work.notify_all(); + for (auto& worker : threads) { + try { + worker.join(); + } + catch(...) {} + } +} + + +void thread_group::data::sync() noexcept { + /* This function and `run()` can both throw when acquiring the lock, or in + the dispatched function. It is tough to recover from either, particularly the + lock case. These functions are marked as noexcept so that if either call + throws, the entire process is terminated. Users of the `dispatch` call are + expected to make their functions noexcept, or use std::packaged_task to copy + exceptions so that the process will continue in all but the most pessimistic + cases (std::bad_alloc). This was the existing behavior; + `asio::io_service::run` propogates errors from dispatched calls, and uncaught + exceptions on threads result in process termination. */ + assert(!threads.empty()); + bool not_first = false; + while (true) { + std::unique_ptr<work> next = nullptr; + { + std::unique_lock<std::mutex> lock(mutex); + pending -= std::size_t(not_first); + not_first = true; + finished_work.notify_all(); + + if (stop) { + return; + } + + next = get_next(); + if (next == nullptr) { + finished_work.wait(lock, [this] { return pending == 0 || stop; }); + return; + } + } + assert(next->f); + next->f(); + } +} + +std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept { + std::unique_ptr<work> rc = std::move(head.ptr); + if (rc != nullptr) { + head.ptr = std::move(rc->next.ptr); + if (head.ptr == nullptr) { + last = std::addressof(head); + } + } + return rc; +} + +void thread_group::data::run() noexcept { + // see `sync()` source for additional information + while (true) { + std::unique_ptr<work> next = nullptr; + { + std::unique_lock<std::mutex> lock(mutex); + --pending; + finished_work.notify_all(); + has_work.wait(lock, [this] { return head.ptr != nullptr || stop; }); + if (stop) { + return; + } + next = get_next(); + } + assert(next != nullptr); + assert(next->f); + next->f(); + } +} + +void thread_group::data::dispatch(std::function<void()> f) { + std::unique_ptr<work> latest(new work{std::move(f), node{nullptr}}); + node* const latest_node = std::addressof(latest->next); + { + const std::unique_lock<std::mutex> lock(mutex); + assert(last != nullptr); + assert(last->next == nullptr); + if (pending == std::numeric_limits<std::size_t>::max()) { + throw std::overflow_error("thread_group exceeded max queue depth"); + } + last->ptr = std::move(latest); + last = latest_node; + ++pending; + } + has_work.notify_one(); +} +} diff --git a/src/common/thread_group.h b/src/common/thread_group.h new file mode 100644 index 000000000..d8461d49a --- /dev/null +++ b/src/common/thread_group.h @@ -0,0 +1,133 @@ +// Copyright (c) 2014-2016, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include <boost/optional/optional.hpp> +#include <condition_variable> +#include <cstddef> +#include <functional> +#include <thread> +#include <utility> +#include <vector> + +namespace tools +{ +//! Manages zero or more threads for work dispatching +class thread_group +{ +public: + //! Create `min(count, get_max_concurrency()) - 1` threads + explicit thread_group(std::size_t count); + + thread_group(thread_group const&) = delete; + thread_group(thread_group&&) = delete; + + //! Joins threads, but does not necessarily run all dispatched functions. + ~thread_group() = default; + + thread_group& operator=(thread_group const&) = delete; + thread_group& operator=(thread_group&&) = delete; + + /*! Blocks until all functions provided to `dispatch` complete. Does not + destroy threads. If a dispatched function calls `this->dispatch(...)`, + `this->sync()` will continue to block until that new function completes. */ + void sync() noexcept { + if (internal) { + internal->sync(); + } + } + + /*! Example usage: + std::unique_ptr<thread_group, thread_group::lazy_sync> sync(std::addressof(group)); + which guarantees synchronization before the unique_ptr destructor returns. */ + struct lazy_sync { + void operator()(thread_group* group) const noexcept { + if (group != nullptr) { + group->sync(); + } + } + }; + + /*! `f` is invoked immediately if the thread_group is empty, otherwise + execution of `f` is queued for next available thread. If `f` is queued, any + exception leaving that function will result in process termination. Use + std::packaged_task if exceptions need to be handled. */ + template<typename F> + void dispatch(F&& f) { + if (internal) { + internal->dispatch(std::forward<F>(f)); + } + else { + f(); + } + } + +private: + class data { + public: + data(std::size_t count); + ~data() noexcept; + + void sync() noexcept; + + void dispatch(std::function<void()> f); + + private: + struct work; + + struct node { + node() = delete; + std::unique_ptr<work> ptr; + }; + + struct work { + work() = delete; + std::function<void()> f; + node next; + }; + + //! Requires lock on `mutex`. + std::unique_ptr<work> get_next() noexcept; + + //! Blocks until destructor is invoked, only call from thread. + void run() noexcept; + + private: + std::vector<std::thread> threads; + node head; + node* last; + std::size_t pending; + std::condition_variable has_work; + std::condition_variable finished_work; + std::mutex mutex; + bool stop; + }; + +private: + // optionally construct elements, without separate heap allocation + boost::optional<data> internal; +}; +} |