diff options
Diffstat (limited to '')
-rw-r--r-- | src/common/thread_group.cpp | 86 |
1 files changed, 36 insertions, 50 deletions
diff --git a/src/common/thread_group.cpp b/src/common/thread_group.cpp index aa1b64f2e..4e1cc8964 100644 --- a/src/common/thread_group.cpp +++ b/src/common/thread_group.cpp @@ -27,6 +27,7 @@ // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "common/thread_group.h" +#include <boost/thread/locks.hpp> #include <cassert> #include <limits> #include <stdexcept> @@ -35,14 +36,20 @@ namespace tools { -thread_group::thread_group(std::size_t count) : internal() { +std::size_t thread_group::optimal() { static_assert( std::numeric_limits<unsigned>::max() <= std::numeric_limits<std::size_t>::max(), "unexpected truncation" ); - count = std::min<std::size_t>(count, get_max_concurrency()); - count = count ? count - 1 : 0; + const std::size_t hardware = get_max_concurrency(); + return hardware ? (hardware - 1) : 0; +} + +std::size_t thread_group::optimal_with_max(std::size_t count) { + return count ? std::min(count - 1, optimal()) : 0; +} +thread_group::thread_group(std::size_t count) : internal() { if (count) { internal.emplace(count); } @@ -52,24 +59,21 @@ thread_group::data::data(std::size_t count) : threads() , head{nullptr} , last(std::addressof(head)) - , pending(count) , mutex() , has_work() - , finished_work() , stop(false) { threads.reserve(count); while (count--) { - threads.push_back(std::thread(&thread_group::data::run, this)); + threads.push_back(boost::thread(&thread_group::data::run, this)); } } thread_group::data::~data() noexcept { { - const std::unique_lock<std::mutex> lock(mutex); + const boost::unique_lock<boost::mutex> lock(mutex); stop = true; } has_work.notify_all(); - finished_work.notify_all(); for (auto& worker : threads) { try { worker.join(); @@ -78,10 +82,20 @@ thread_group::data::~data() noexcept { } } +std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept { + std::unique_ptr<work> rc = std::move(head.ptr); + if (rc != nullptr) { + head.ptr = std::move(rc->next.ptr); + if (head.ptr == nullptr) { + last = std::addressof(head); + } + } + return rc; +} -void thread_group::data::sync() noexcept { +bool thread_group::data::try_run_one() noexcept { /* This function and `run()` can both throw when acquiring the lock, or in - the dispatched function. It is tough to recover from either, particularly the + dispatched function. It is tough to recover from either, particularly the lock case. These functions are marked as noexcept so that if either call throws, the entire process is terminated. Users of the `dispatch` call are expected to make their functions noexcept, or use std::packaged_task to copy @@ -89,50 +103,25 @@ void thread_group::data::sync() noexcept { cases (std::bad_alloc). This was the existing behavior; `asio::io_service::run` propogates errors from dispatched calls, and uncaught exceptions on threads result in process termination. */ - assert(!threads.empty()); - bool not_first = false; - while (true) { - std::unique_ptr<work> next = nullptr; - { - std::unique_lock<std::mutex> lock(mutex); - pending -= std::size_t(not_first); - not_first = true; - finished_work.notify_all(); - - if (stop) { - return; - } - - next = get_next(); - if (next == nullptr) { - finished_work.wait(lock, [this] { return pending == 0 || stop; }); - return; - } - } + std::unique_ptr<work> next = nullptr; + { + const boost::unique_lock<boost::mutex> lock(mutex); + next = get_next(); + } + if (next) { assert(next->f); next->f(); + return true; } -} - -std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept { - std::unique_ptr<work> rc = std::move(head.ptr); - if (rc != nullptr) { - head.ptr = std::move(rc->next.ptr); - if (head.ptr == nullptr) { - last = std::addressof(head); - } - } - return rc; + return false; } void thread_group::data::run() noexcept { - // see `sync()` source for additional information + // see `try_run_one()` source for additional information while (true) { std::unique_ptr<work> next = nullptr; { - std::unique_lock<std::mutex> lock(mutex); - --pending; - finished_work.notify_all(); + boost::unique_lock<boost::mutex> lock(mutex); has_work.wait(lock, [this] { return head.ptr != nullptr || stop; }); if (stop) { return; @@ -149,15 +138,12 @@ void thread_group::data::dispatch(std::function<void()> f) { std::unique_ptr<work> latest(new work{std::move(f), node{nullptr}}); node* const latest_node = std::addressof(latest->next); { - const std::unique_lock<std::mutex> lock(mutex); + const boost::unique_lock<boost::mutex> lock(mutex); assert(last != nullptr); assert(last->ptr == nullptr); - if (pending == std::numeric_limits<std::size_t>::max()) { - throw std::overflow_error("thread_group exceeded max queue depth"); - } + last->ptr = std::move(latest); last = latest_node; - ++pending; } has_work.notify_one(); } |