aboutsummaryrefslogtreecommitdiff
path: root/src/common/thread_group.cpp
diff options
context:
space:
mode:
authorLee Clagett <code@leeclagett.com>2016-11-21 14:48:42 -0500
committerLee Clagett <code@leeclagett.com>2016-11-23 14:41:25 -0500
commitf025198f195516da8654bece64bf7a1fb85be3cd (patch)
treeef760c61553d5a51d09bcfbb4b02840169ab5c13 /src/common/thread_group.cpp
parentMerge pull request #1346 (diff)
downloadmonero-f025198f195516da8654bece64bf7a1fb85be3cd.tar.xz
Added task_region - a fork/join task implementation
Diffstat (limited to 'src/common/thread_group.cpp')
-rw-r--r--src/common/thread_group.cpp86
1 files changed, 36 insertions, 50 deletions
diff --git a/src/common/thread_group.cpp b/src/common/thread_group.cpp
index aa1b64f2e..4e1cc8964 100644
--- a/src/common/thread_group.cpp
+++ b/src/common/thread_group.cpp
@@ -27,6 +27,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "common/thread_group.h"
+#include <boost/thread/locks.hpp>
#include <cassert>
#include <limits>
#include <stdexcept>
@@ -35,14 +36,20 @@
namespace tools
{
-thread_group::thread_group(std::size_t count) : internal() {
+std::size_t thread_group::optimal() {
static_assert(
std::numeric_limits<unsigned>::max() <= std::numeric_limits<std::size_t>::max(),
"unexpected truncation"
);
- count = std::min<std::size_t>(count, get_max_concurrency());
- count = count ? count - 1 : 0;
+ const std::size_t hardware = get_max_concurrency();
+ return hardware ? (hardware - 1) : 0;
+}
+
+std::size_t thread_group::optimal_with_max(std::size_t count) {
+ return count ? std::min(count - 1, optimal()) : 0;
+}
+thread_group::thread_group(std::size_t count) : internal() {
if (count) {
internal.emplace(count);
}
@@ -52,24 +59,21 @@ thread_group::data::data(std::size_t count)
: threads()
, head{nullptr}
, last(std::addressof(head))
- , pending(count)
, mutex()
, has_work()
- , finished_work()
, stop(false) {
threads.reserve(count);
while (count--) {
- threads.push_back(std::thread(&thread_group::data::run, this));
+ threads.push_back(boost::thread(&thread_group::data::run, this));
}
}
thread_group::data::~data() noexcept {
{
- const std::unique_lock<std::mutex> lock(mutex);
+ const boost::unique_lock<boost::mutex> lock(mutex);
stop = true;
}
has_work.notify_all();
- finished_work.notify_all();
for (auto& worker : threads) {
try {
worker.join();
@@ -78,10 +82,20 @@ thread_group::data::~data() noexcept {
}
}
+std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept {
+ std::unique_ptr<work> rc = std::move(head.ptr);
+ if (rc != nullptr) {
+ head.ptr = std::move(rc->next.ptr);
+ if (head.ptr == nullptr) {
+ last = std::addressof(head);
+ }
+ }
+ return rc;
+}
-void thread_group::data::sync() noexcept {
+bool thread_group::data::try_run_one() noexcept {
/* This function and `run()` can both throw when acquiring the lock, or in
- the dispatched function. It is tough to recover from either, particularly the
+ dispatched function. It is tough to recover from either, particularly the
lock case. These functions are marked as noexcept so that if either call
throws, the entire process is terminated. Users of the `dispatch` call are
expected to make their functions noexcept, or use std::packaged_task to copy
@@ -89,50 +103,25 @@ void thread_group::data::sync() noexcept {
cases (std::bad_alloc). This was the existing behavior;
`asio::io_service::run` propogates errors from dispatched calls, and uncaught
exceptions on threads result in process termination. */
- assert(!threads.empty());
- bool not_first = false;
- while (true) {
- std::unique_ptr<work> next = nullptr;
- {
- std::unique_lock<std::mutex> lock(mutex);
- pending -= std::size_t(not_first);
- not_first = true;
- finished_work.notify_all();
-
- if (stop) {
- return;
- }
-
- next = get_next();
- if (next == nullptr) {
- finished_work.wait(lock, [this] { return pending == 0 || stop; });
- return;
- }
- }
+ std::unique_ptr<work> next = nullptr;
+ {
+ const boost::unique_lock<boost::mutex> lock(mutex);
+ next = get_next();
+ }
+ if (next) {
assert(next->f);
next->f();
+ return true;
}
-}
-
-std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept {
- std::unique_ptr<work> rc = std::move(head.ptr);
- if (rc != nullptr) {
- head.ptr = std::move(rc->next.ptr);
- if (head.ptr == nullptr) {
- last = std::addressof(head);
- }
- }
- return rc;
+ return false;
}
void thread_group::data::run() noexcept {
- // see `sync()` source for additional information
+ // see `try_run_one()` source for additional information
while (true) {
std::unique_ptr<work> next = nullptr;
{
- std::unique_lock<std::mutex> lock(mutex);
- --pending;
- finished_work.notify_all();
+ boost::unique_lock<boost::mutex> lock(mutex);
has_work.wait(lock, [this] { return head.ptr != nullptr || stop; });
if (stop) {
return;
@@ -149,15 +138,12 @@ void thread_group::data::dispatch(std::function<void()> f) {
std::unique_ptr<work> latest(new work{std::move(f), node{nullptr}});
node* const latest_node = std::addressof(latest->next);
{
- const std::unique_lock<std::mutex> lock(mutex);
+ const boost::unique_lock<boost::mutex> lock(mutex);
assert(last != nullptr);
assert(last->ptr == nullptr);
- if (pending == std::numeric_limits<std::size_t>::max()) {
- throw std::overflow_error("thread_group exceeded max queue depth");
- }
+
last->ptr = std::move(latest);
last = latest_node;
- ++pending;
}
has_work.notify_one();
}