From 64094e5f4e2e909bdab0d07c3c2d16b7531d9df3 Mon Sep 17 00:00:00 2001 From: Lee Clagett Date: Wed, 2 Nov 2016 16:41:43 -0400 Subject: adding thread_group for managing async tasks --- src/common/thread_group.h | 133 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 src/common/thread_group.h (limited to 'src/common/thread_group.h') diff --git a/src/common/thread_group.h b/src/common/thread_group.h new file mode 100644 index 000000000..d8461d49a --- /dev/null +++ b/src/common/thread_group.h @@ -0,0 +1,133 @@ +// Copyright (c) 2014-2016, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include +#include +#include +#include +#include +#include +#include + +namespace tools +{ +//! Manages zero or more threads for work dispatching +class thread_group +{ +public: + //! Create `min(count, get_max_concurrency()) - 1` threads + explicit thread_group(std::size_t count); + + thread_group(thread_group const&) = delete; + thread_group(thread_group&&) = delete; + + //! Joins threads, but does not necessarily run all dispatched functions. + ~thread_group() = default; + + thread_group& operator=(thread_group const&) = delete; + thread_group& operator=(thread_group&&) = delete; + + /*! Blocks until all functions provided to `dispatch` complete. Does not + destroy threads. If a dispatched function calls `this->dispatch(...)`, + `this->sync()` will continue to block until that new function completes. */ + void sync() noexcept { + if (internal) { + internal->sync(); + } + } + + /*! Example usage: + std::unique_ptr sync(std::addressof(group)); + which guarantees synchronization before the unique_ptr destructor returns. */ + struct lazy_sync { + void operator()(thread_group* group) const noexcept { + if (group != nullptr) { + group->sync(); + } + } + }; + + /*! `f` is invoked immediately if the thread_group is empty, otherwise + execution of `f` is queued for next available thread. If `f` is queued, any + exception leaving that function will result in process termination. Use + std::packaged_task if exceptions need to be handled. */ + template + void dispatch(F&& f) { + if (internal) { + internal->dispatch(std::forward(f)); + } + else { + f(); + } + } + +private: + class data { + public: + data(std::size_t count); + ~data() noexcept; + + void sync() noexcept; + + void dispatch(std::function f); + + private: + struct work; + + struct node { + node() = delete; + std::unique_ptr ptr; + }; + + struct work { + work() = delete; + std::function f; + node next; + }; + + //! Requires lock on `mutex`. + std::unique_ptr get_next() noexcept; + + //! Blocks until destructor is invoked, only call from thread. + void run() noexcept; + + private: + std::vector threads; + node head; + node* last; + std::size_t pending; + std::condition_variable has_work; + std::condition_variable finished_work; + std::mutex mutex; + bool stop; + }; + +private: + // optionally construct elements, without separate heap allocation + boost::optional internal; +}; +} -- cgit v1.2.3