diff options
Diffstat (limited to '')
-rw-r--r-- | src/blockchain_db/berkeleydb/db_bdb.h | 172 |
1 files changed, 162 insertions, 10 deletions
diff --git a/src/blockchain_db/berkeleydb/db_bdb.h b/src/blockchain_db/berkeleydb/db_bdb.h index fec4ed24f..41f4bcb78 100644 --- a/src/blockchain_db/berkeleydb/db_bdb.h +++ b/src/blockchain_db/berkeleydb/db_bdb.h @@ -30,6 +30,11 @@ #include "blockchain_db/blockchain_db.h" #include "cryptonote_protocol/blobdatatype.h" // for type blobdata +#include <unordered_map> + +// ND: Enables multi-threaded bulk reads for when getting indices. +// TODO: Disabled for now, as it doesn't seem to provide noticeable improvements (??. Reason: TBD. +// #define BDB_BULK_CAN_THREAD namespace cryptonote { @@ -83,10 +88,145 @@ struct bdb_txn_safe { return &m_txn; } - +private: DbTxn* m_txn; }; +// ND: Class to handle buffer management when doing bulk queries +// (DB_MULTIPLE). Allocates buffers then handles thread queuing +// so a fixed set of buffers can be used (instead of allocating +// every time a bulk query is needed). +template <typename T> +class bdb_safe_buffer +{ + // limit the number of buffers to 8 + const size_t MaxAllowedBuffers = 8; +public: + bdb_safe_buffer(size_t num_buffers, size_t count) + { + if(num_buffers > MaxAllowedBuffers) + num_buffers = MaxAllowedBuffers; + + set_count(num_buffers); + for (size_t i = 0; i < num_buffers; i++) + m_buffers.push_back((T) malloc(sizeof(T) * count)); + m_buffer_count = count; + } + + ~bdb_safe_buffer() + { + for (size_t i = 0; i < m_buffers.size(); i++) + { + if (m_buffers[i]) + { + free(m_buffers[i]); + m_buffers[i] = nullptr; + } + } + + m_buffers.resize(0); + } + + T acquire_buffer() + { + std::unique_lock<std::mutex> lock(m_lock); + m_cv.wait(lock, [&]{ return m_count > 0; }); + + --m_count; + size_t index = -1; + for (size_t i = 0; i < m_open_slot.size(); i++) + { + if (m_open_slot[i]) + { + m_open_slot[i] = false; + index = i; + break; + } + } + + assert(index >= 0); + + T buffer = m_buffers[index]; + m_buffer_map.emplace(buffer, index); + return buffer; + } + + void release_buffer(T buffer) + { + std::unique_lock<std::mutex> lock(m_lock); + + assert(buffer != nullptr); + auto it = m_buffer_map.find(buffer); + if (it != m_buffer_map.end()) + { + auto index = it->second; + + assert(index < m_open_slot.size()); + assert(m_open_slot[index] == false); + assert(m_count < m_open_slot.size()); + + ++m_count; + m_open_slot[index] = true; + m_buffer_map.erase(it); + m_cv.notify_one(); + } + } + + size_t get_buffer_size() const + { + return m_buffer_count * sizeof(T); + } + + size_t get_buffer_count() const + { + return m_buffer_count; + } + + typedef T type; + +private: + void set_count(size_t count) + { + assert(count > 0); + m_open_slot.resize(count, true); + m_count = count; + } + + std::vector<T> m_buffers; + std::unordered_map<T, size_t> m_buffer_map; + + std::condition_variable m_cv; + std::vector<bool> m_open_slot; + size_t m_count; + std::mutex m_lock; + + size_t m_buffer_count; +}; + +template <typename T> +class bdb_safe_buffer_autolock +{ +public: + bdb_safe_buffer_autolock(T &safe_buffer, typename T::type &buffer) : + m_safe_buffer(safe_buffer), m_buffer(nullptr) + { + m_buffer = m_safe_buffer.acquire_buffer(); + buffer = m_buffer; + } + + ~bdb_safe_buffer_autolock() + { + if (m_buffer != nullptr) + { + m_safe_buffer.release_buffer(m_buffer); + m_buffer = nullptr; + } + } +private: + T &m_safe_buffer; + typename T::type m_buffer; +}; + class BlockchainBDB : public BlockchainDB { public: @@ -159,8 +299,9 @@ public: virtual uint64_t get_num_outputs(const uint64_t& amount) const; - virtual crypto::public_key get_output_key(const uint64_t& amount, const uint64_t& index) const; - + virtual output_data_t get_output_key(const uint64_t& amount, const uint64_t& index); + virtual output_data_t get_output_key(const uint64_t& global_index) const; + virtual void get_output_key(const uint64_t &amount, const std::vector<uint64_t> &offsets, std::vector<output_data_t> &outputs); virtual tx_out get_output(const crypto::hash& h, const uint64_t& index) const; /** @@ -175,9 +316,11 @@ public: tx_out get_output(const uint64_t& index) const; virtual tx_out_index get_output_tx_and_index_from_global(const uint64_t& index) const; + virtual void get_output_tx_and_index_from_global(const std::vector<uint64_t> &global_indices, + std::vector<tx_out_index> &tx_out_indices) const; - virtual tx_out_index get_output_tx_and_index(const uint64_t& amount, const uint64_t& index) const; - virtual void get_output_tx_and_index(const uint64_t& amount, std::vector<uint64_t> &offsets, std::vector<tx_out_index> &indices) const; + virtual tx_out_index get_output_tx_and_index(const uint64_t& amount, const uint64_t& index); + virtual void get_output_tx_and_index(const uint64_t& amount, const std::vector<uint64_t> &offsets, std::vector<tx_out_index> &indices); virtual std::vector<uint64_t> get_tx_output_indices(const crypto::hash& h) const; virtual std::vector<uint64_t> get_tx_amount_output_indices(const crypto::hash& h) const; @@ -198,7 +341,12 @@ public: virtual void batch_abort(); virtual void pop_block(block& blk, std::vector<transaction>& txs); - virtual bool has_bulk_indices() const { return true; } + +#if defined(BDB_BULK_CAN_THREAD) + virtual bool can_thread_bulk_indices() const { return true; } +#else + virtual bool can_thread_bulk_indices() const { return false; } +#endif private: virtual void add_block( const block& blk @@ -214,7 +362,7 @@ private: virtual void remove_transaction_data(const crypto::hash& tx_hash, const transaction& tx); - virtual void add_output(const crypto::hash& tx_hash, const tx_out& tx_output, const uint64_t& local_index); + virtual void add_output(const crypto::hash& tx_hash, const tx_out& tx_output, const uint64_t& local_index, const uint64_t unlock_time); virtual void remove_output(const tx_out& tx_output); @@ -227,6 +375,7 @@ private: virtual void remove_spent_key(const crypto::key_image& k_image); + void get_output_global_indices(const uint64_t& amount, const std::vector<uint64_t> &offsets, std::vector<uint64_t> &global_indices); /** * @brief convert a tx output to a blob for storage * @@ -253,10 +402,13 @@ private: * * @return the global index of the desired output */ - uint64_t get_output_global_index(const uint64_t& amount, const uint64_t& index) const; - + uint64_t get_output_global_index(const uint64_t& amount, const uint64_t& index); + void checkpoint_worker() const; void check_open() const; - void *m_buffer; + bool m_run_checkpoint; + std::unique_ptr<boost::thread> m_checkpoint_thread; + typedef bdb_safe_buffer<void *> bdb_safe_buffer_t; + bdb_safe_buffer_t m_buffer; DbEnv* m_env; |