From 64094e5f4e2e909bdab0d07c3c2d16b7531d9df3 Mon Sep 17 00:00:00 2001 From: Lee Clagett Date: Wed, 2 Nov 2016 16:41:43 -0400 Subject: adding thread_group for managing async tasks --- src/ringct/rctSigs.cpp | 148 +++++++++++++++++-------------------------------- 1 file changed, 50 insertions(+), 98 deletions(-) (limited to 'src/ringct') diff --git a/src/ringct/rctSigs.cpp b/src/ringct/rctSigs.cpp index 19e9d291e..df33c26b2 100644 --- a/src/ringct/rctSigs.cpp +++ b/src/ringct/rctSigs.cpp @@ -28,9 +28,9 @@ // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#include #include "misc_log_ex.h" #include "common/perf_timer.h" +#include "common/thread_group.h" #include "common/util.h" #include "rctSigs.h" #include "cryptonote_core/cryptonote_format_utils.h" @@ -38,17 +38,22 @@ using namespace crypto; using namespace std; -#define KILL_IOSERVICE() \ - if(ioservice_active) \ - { \ - work.reset(); \ - while (!ioservice.stopped()) ioservice.poll(); \ - threadpool.join_all(); \ - ioservice.stop(); \ - ioservice_active = false; \ +namespace rct { + namespace { + struct verRangeWrapper_ { + void operator()(const key & C, const rangeSig & as, bool &result) const { + result = verRange(C, as); } + }; + constexpr const verRangeWrapper_ verRangeWrapper{}; -namespace rct { + struct verRctMGSimpleWrapper_ { + void operator()(const key &message, const mgSig &mg, const ctkeyV & pubs, const key & C, bool &result) const { + result = verRctMGSimple(message, mg, pubs, C); + } + }; + constexpr const verRctMGSimpleWrapper_ verRctMGSimpleWrapper{}; + } //Schnorr Non-linkable //Gen Gives a signature (L1, s1, s2) proving that the sender knows "x" such that xG = one of P1 or P2 @@ -360,10 +365,6 @@ namespace rct { return true; } - void verRangeWrapper(const key & C, const rangeSig & as, bool &result) { - result = verRange(C, as); - } - key get_pre_mlsag_hash(const rctSig &rv) { keyV hashes; @@ -544,9 +545,6 @@ namespace rct { return MLSAG_Ver(message, M, mg, rows); } - void verRctMGSimpleWrapper(const key &message, const mgSig &mg, const ctkeyV & pubs, const key & C, bool &result) { - result = verRctMGSimple(message, mg, pubs, C); - } //These functions get keys from blockchain //replace these when connecting blockchain @@ -767,38 +765,20 @@ namespace rct { // some rct ops can throw try { - boost::asio::io_service ioservice; - boost::thread_group threadpool; - std::unique_ptr work(new boost::asio::io_service::work(ioservice)); - size_t threads = tools::get_max_concurrency(); - threads = std::min(threads, rv.outPk.size()); - for (size_t i = 0; i < threads; ++i) - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - bool ioservice_active = true; std::deque results(rv.outPk.size(), false); - epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); }); + tools::thread_group threadpool(rv.outPk.size()); // this must destruct before results DP("range proofs verified?"); for (size_t i = 0; i < rv.outPk.size(); i++) { - if (threads > 1) { - ioservice.dispatch(boost::bind(&verRangeWrapper, std::cref(rv.outPk[i].mask), std::cref(rv.p.rangeSigs[i]), std::ref(results[i]))); - } - else { - bool tmp = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); - DP(tmp); - if (!tmp) { - LOG_ERROR("Range proof verification failed for input " << i); - return false; - } - } + threadpool.dispatch( + std::bind(verRangeWrapper, std::cref(rv.outPk[i].mask), std::cref(rv.p.rangeSigs[i]), std::ref(results[i])) + ); } - KILL_IOSERVICE(); - if (threads > 1) { - for (size_t i = 0; i < rv.outPk.size(); ++i) { - if (!results[i]) { - LOG_ERROR("Range proof verified failed for input " << i); - return false; - } + threadpool.sync(); + for (size_t i = 0; i < rv.outPk.size(); ++i) { + if (!results[i]) { + LOG_ERROR("Range proof verified failed for input " << i); + return false; } } @@ -832,34 +812,23 @@ namespace rct { CHECK_AND_ASSERT_MES(rv.pseudoOuts.size() == rv.p.MGs.size(), false, "Mismatched sizes of rv.pseudoOuts and rv.p.MGs"); CHECK_AND_ASSERT_MES(rv.pseudoOuts.size() == rv.mixRing.size(), false, "Mismatched sizes of rv.pseudoOuts and mixRing"); + const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size()); + tools::thread_group threadpool(threads); { - boost::asio::io_service ioservice; - boost::thread_group threadpool; - std::unique_ptr work(new boost::asio::io_service::work(ioservice)); - size_t threads = tools::get_max_concurrency(); - threads = std::min(threads, rv.outPk.size()); - for (size_t i = 0; i < threads; ++i) - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - bool ioservice_active = true; std::deque results(rv.outPk.size(), false); - epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); }); - - for (i = 0; i < rv.outPk.size(); i++) { - if (threads > 1) { - ioservice.dispatch(boost::bind(&verRangeWrapper, std::cref(rv.outPk[i].mask), std::cref(rv.p.rangeSigs[i]), std::ref(results[i]))); - } - else if (!verRange(rv.outPk[i].mask, rv.p.rangeSigs[i])) { - LOG_ERROR("Range proof verified failed for input " << i); - return false; + { + const std::unique_ptr + sync(std::addressof(threadpool)); + for (i = 0; i < rv.outPk.size(); i++) { + threadpool.dispatch( + std::bind(verRangeWrapper, std::cref(rv.outPk[i].mask), std::cref(rv.p.rangeSigs[i]), std::ref(results[i])) + ); } - } - KILL_IOSERVICE(); - if (threads > 1) { - for (size_t i = 0; i < rv.outPk.size(); ++i) { - if (!results[i]) { - LOG_ERROR("Range proof verified failed for input " << i); - return false; - } + } // threadpool.sync(); + for (size_t i = 0; i < rv.outPk.size(); ++i) { + if (!results[i]) { + LOG_ERROR("Range proof verified failed for input " << i); + return false; } } } @@ -875,37 +844,20 @@ namespace rct { key message = get_pre_mlsag_hash(rv); { - boost::asio::io_service ioservice; - boost::thread_group threadpool; - std::unique_ptr work(new boost::asio::io_service::work(ioservice)); - size_t threads = tools::get_max_concurrency(); - threads = std::min(threads, rv.mixRing.size()); - for (size_t i = 0; i < threads; ++i) - threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice)); - bool ioservice_active = true; std::deque results(rv.mixRing.size(), false); - epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); }); - - for (i = 0 ; i < rv.mixRing.size() ; i++) { - if (threads > 1) { - ioservice.dispatch(boost::bind(&verRctMGSimpleWrapper, std::cref(message), std::cref(rv.p.MGs[i]), std::cref(rv.mixRing[i]), std::cref(rv.pseudoOuts[i]), std::ref(results[i]))); + { + const std::unique_ptr + sync(std::addressof(threadpool)); + for (i = 0 ; i < rv.mixRing.size() ; i++) { + threadpool.dispatch( + std::bind(verRctMGSimpleWrapper, std::cref(message), std::cref(rv.p.MGs[i]), std::cref(rv.mixRing[i]), std::cref(rv.pseudoOuts[i]), std::ref(results[i])) + ); } - else { - bool tmpb = verRctMGSimple(message, rv.p.MGs[i], rv.mixRing[i], rv.pseudoOuts[i]); - DP(tmpb); - if (!tmpb) { - LOG_ERROR("verRctMGSimple failed for input " << i); - return false; - } - } - } - KILL_IOSERVICE(); - if (threads > 1) { - for (size_t i = 0; i < results.size(); ++i) { - if (!results[i]) { - LOG_ERROR("verRctMGSimple failed for input " << i); - return false; - } + } // threadpool.sync(); + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i]) { + LOG_ERROR("verRctMGSimple failed for input " << i); + return false; } } } -- cgit v1.2.3