aboutsummaryrefslogblamecommitdiff
path: root/src/common/thread_group.h
blob: 62e82d8320ca75509e16e7cf3c35297ae152ca6a (plain) (tree)


























                                                                                          

            
                                      


                                              







                     
                                                      


                  










                                                                     










                                                                           

                                                      
                   
                               
     
             

   



                                                                                
     

                 
 



                                                                              















                                             


                                        
 
                                






















                                                                  
                                       

               

                                       






                                                                    
 
 
// Copyright (c) 2014-2016, The Monero Project
// 
// All rights reserved.
// 
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
// 
// 1. Redistributions of source code must retain the above copyright notice, this list of
//    conditions and the following disclaimer.
// 
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
//    of conditions and the following disclaimer in the documentation and/or other
//    materials provided with the distribution.
// 
// 3. Neither the name of the copyright holder nor the names of its contributors may be
//    used to endorse or promote products derived from this software without specific
//    prior written permission.
// 
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once

#include <boost/optional/optional.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <cstddef>
#include <functional>
#include <thread>
#include <utility>
#include <vector>

namespace tools 
{
//! Manages zero or more threads for work dispatching.
class thread_group
{
public:

  //! \return `get_max_concurrency() ? get_max_concurrency() - 1 : 0`
  static std::size_t optimal();

  //! \return `count ? min(count - 1, optimal()) : 0`
  static std::size_t optimal_with_max(std::size_t count);

  //! Create an optimal number of threads.
  explicit thread_group() : thread_group(optimal()) {}

  //! Create exactly `count` threads.
  explicit thread_group(std::size_t count);

  thread_group(thread_group const&) = delete;
  thread_group(thread_group&&) = delete;

  //! Joins threads, but does not necessarily run all dispatched functions.
  ~thread_group() = default;

  thread_group& operator=(thread_group const&) = delete;
  thread_group& operator=(thread_group&&) = delete;

  //! \return Number of threads owned by `this` group.
  std::size_t count() const noexcept {
    if (internal) {
      return internal->count();
    }
    return 0;
  }

  //! \return True iff a function was available and executed (on `this_thread`).
  bool try_run_one() noexcept {
    if (internal) {
      return internal->try_run_one();
    }
    return false;
  }

  /*! `f` is invoked immediately if `count() == 0`, otherwise execution of `f`
  is queued for next available thread. If `f` is queued, any exception leaving
  that function will result in process termination. Use std::packaged_task if
  exceptions need to be handled. */
  template<typename F>
  void dispatch(F&& f) {
    if (internal) {
      internal->dispatch(std::forward<F>(f));
    }
    else {
      f();
    }
  }

private:
  class data {
  public:
    data(std::size_t count);
    ~data() noexcept;

    std::size_t count() const noexcept {
      return threads.size();
    }

    bool try_run_one() noexcept;
    void dispatch(std::function<void()> f);

  private:
    struct work;

    struct node {
      node() = delete;
      std::unique_ptr<work> ptr;
    };

    struct work {
      work() = delete;
      std::function<void()> f;
      node next;
    };

    //! Requires lock on `mutex`.
    std::unique_ptr<work> get_next() noexcept;

    //! Blocks until destructor is invoked, only call from thread.
    void run() noexcept;

  private:
    std::vector<boost::thread> threads;
    node head;
    node* last;
    boost::condition_variable has_work;
    boost::mutex mutex;
    bool stop;
  };

private:
  // optionally construct elements, without separate heap allocation
  boost::optional<data> internal;
};

}