diff options
author | Riccardo Spagni <ric@spagni.net> | 2017-09-18 13:19:26 +0200 |
---|---|---|
committer | Riccardo Spagni <ric@spagni.net> | 2017-09-18 13:19:26 +0200 |
commit | 1a73843ceca12932e57f57caf66033c69f8c0d1f (patch) | |
tree | f210349b2a9b5343876556c4ed379444de9c0fda /src/common/threadpool.cpp | |
parent | Merge pull request #2416 (diff) | |
parent | Tweak concurrency limits (diff) | |
download | monero-1a73843ceca12932e57f57caf66033c69f8c0d1f.tar.xz |
Merge pull request #2446
6d0ca7d1 Tweak concurrency limits (Howard Chu)
510d0d47 Use a threadpool (Howard Chu)
Diffstat (limited to 'src/common/threadpool.cpp')
-rw-r--r-- | src/common/threadpool.cpp | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/src/common/threadpool.cpp b/src/common/threadpool.cpp new file mode 100644 index 000000000..41d0c25e0 --- /dev/null +++ b/src/common/threadpool.cpp @@ -0,0 +1,117 @@ +// Copyright (c) 2017, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include "common/threadpool.h" + +#include <cassert> +#include <limits> +#include <stdexcept> + +#include "cryptonote_config.h" +#include "common/util.h" + +namespace tools +{ +threadpool::threadpool() : running(true), active(0) { + boost::thread::attributes attrs; + attrs.set_stack_size(THREAD_STACK_SIZE); + max = tools::get_max_concurrency() * 2; + size_t i = max; + while(i--) { + threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this))); + } +} + +threadpool::~threadpool() { + { + const boost::unique_lock<boost::mutex> lock(mutex); + running = false; + has_work.notify_all(); + } + for (size_t i = 0; i<threads.size(); i++) { + threads[i].join(); + } +} + +void threadpool::submit(waiter *obj, std::function<void()> f) { + entry e = {obj, f}; + boost::unique_lock<boost::mutex> lock(mutex); + if (active == max && !queue.empty()) { + // if all available threads are already running + // and there's work waiting, just run in current thread + lock.unlock(); + f(); + } else { + if (obj) + obj->inc(); + queue.push_back(e); + has_work.notify_one(); + } +} + +int threadpool::get_max_concurrency() { + return max / 2; +} + +void threadpool::waiter::wait() { + boost::unique_lock<boost::mutex> lock(mt); + while(num) cv.wait(lock); +} + +void threadpool::waiter::inc() { + const boost::unique_lock<boost::mutex> lock(mt); + num++; +} + +void threadpool::waiter::dec() { + const boost::unique_lock<boost::mutex> lock(mt); + num--; + if (!num) + cv.notify_one(); +} + +void threadpool::run() { + boost::unique_lock<boost::mutex> lock(mutex); + while (running) { + entry e; + while(queue.empty() && running) + has_work.wait(lock); + if (!running) break; + + active++; + e = queue.front(); + queue.pop_front(); + lock.unlock(); + e.f(); + + if (e.wo) + e.wo->dec(); + lock.lock(); + active--; + } +} +} |