aboutsummaryrefslogtreecommitdiff
path: root/src/blockchain_db/berkeleydb/db_bdb.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/blockchain_db/berkeleydb/db_bdb.h172
1 files changed, 162 insertions, 10 deletions
diff --git a/src/blockchain_db/berkeleydb/db_bdb.h b/src/blockchain_db/berkeleydb/db_bdb.h
index fec4ed24f..41f4bcb78 100644
--- a/src/blockchain_db/berkeleydb/db_bdb.h
+++ b/src/blockchain_db/berkeleydb/db_bdb.h
@@ -30,6 +30,11 @@
#include "blockchain_db/blockchain_db.h"
#include "cryptonote_protocol/blobdatatype.h" // for type blobdata
+#include <unordered_map>
+
+// ND: Enables multi-threaded bulk reads for when getting indices.
+// TODO: Disabled for now, as it doesn't seem to provide noticeable improvements (??. Reason: TBD.
+// #define BDB_BULK_CAN_THREAD
namespace cryptonote
{
@@ -83,10 +88,145 @@ struct bdb_txn_safe
{
return &m_txn;
}
-
+private:
DbTxn* m_txn;
};
+// ND: Class to handle buffer management when doing bulk queries
+// (DB_MULTIPLE). Allocates buffers then handles thread queuing
+// so a fixed set of buffers can be used (instead of allocating
+// every time a bulk query is needed).
+template <typename T>
+class bdb_safe_buffer
+{
+ // limit the number of buffers to 8
+ const size_t MaxAllowedBuffers = 8;
+public:
+ bdb_safe_buffer(size_t num_buffers, size_t count)
+ {
+ if(num_buffers > MaxAllowedBuffers)
+ num_buffers = MaxAllowedBuffers;
+
+ set_count(num_buffers);
+ for (size_t i = 0; i < num_buffers; i++)
+ m_buffers.push_back((T) malloc(sizeof(T) * count));
+ m_buffer_count = count;
+ }
+
+ ~bdb_safe_buffer()
+ {
+ for (size_t i = 0; i < m_buffers.size(); i++)
+ {
+ if (m_buffers[i])
+ {
+ free(m_buffers[i]);
+ m_buffers[i] = nullptr;
+ }
+ }
+
+ m_buffers.resize(0);
+ }
+
+ T acquire_buffer()
+ {
+ std::unique_lock<std::mutex> lock(m_lock);
+ m_cv.wait(lock, [&]{ return m_count > 0; });
+
+ --m_count;
+ size_t index = -1;
+ for (size_t i = 0; i < m_open_slot.size(); i++)
+ {
+ if (m_open_slot[i])
+ {
+ m_open_slot[i] = false;
+ index = i;
+ break;
+ }
+ }
+
+ assert(index >= 0);
+
+ T buffer = m_buffers[index];
+ m_buffer_map.emplace(buffer, index);
+ return buffer;
+ }
+
+ void release_buffer(T buffer)
+ {
+ std::unique_lock<std::mutex> lock(m_lock);
+
+ assert(buffer != nullptr);
+ auto it = m_buffer_map.find(buffer);
+ if (it != m_buffer_map.end())
+ {
+ auto index = it->second;
+
+ assert(index < m_open_slot.size());
+ assert(m_open_slot[index] == false);
+ assert(m_count < m_open_slot.size());
+
+ ++m_count;
+ m_open_slot[index] = true;
+ m_buffer_map.erase(it);
+ m_cv.notify_one();
+ }
+ }
+
+ size_t get_buffer_size() const
+ {
+ return m_buffer_count * sizeof(T);
+ }
+
+ size_t get_buffer_count() const
+ {
+ return m_buffer_count;
+ }
+
+ typedef T type;
+
+private:
+ void set_count(size_t count)
+ {
+ assert(count > 0);
+ m_open_slot.resize(count, true);
+ m_count = count;
+ }
+
+ std::vector<T> m_buffers;
+ std::unordered_map<T, size_t> m_buffer_map;
+
+ std::condition_variable m_cv;
+ std::vector<bool> m_open_slot;
+ size_t m_count;
+ std::mutex m_lock;
+
+ size_t m_buffer_count;
+};
+
+template <typename T>
+class bdb_safe_buffer_autolock
+{
+public:
+ bdb_safe_buffer_autolock(T &safe_buffer, typename T::type &buffer) :
+ m_safe_buffer(safe_buffer), m_buffer(nullptr)
+ {
+ m_buffer = m_safe_buffer.acquire_buffer();
+ buffer = m_buffer;
+ }
+
+ ~bdb_safe_buffer_autolock()
+ {
+ if (m_buffer != nullptr)
+ {
+ m_safe_buffer.release_buffer(m_buffer);
+ m_buffer = nullptr;
+ }
+ }
+private:
+ T &m_safe_buffer;
+ typename T::type m_buffer;
+};
+
class BlockchainBDB : public BlockchainDB
{
public:
@@ -159,8 +299,9 @@ public:
virtual uint64_t get_num_outputs(const uint64_t& amount) const;
- virtual crypto::public_key get_output_key(const uint64_t& amount, const uint64_t& index) const;
-
+ virtual output_data_t get_output_key(const uint64_t& amount, const uint64_t& index);
+ virtual output_data_t get_output_key(const uint64_t& global_index) const;
+ virtual void get_output_key(const uint64_t &amount, const std::vector<uint64_t> &offsets, std::vector<output_data_t> &outputs);
virtual tx_out get_output(const crypto::hash& h, const uint64_t& index) const;
/**
@@ -175,9 +316,11 @@ public:
tx_out get_output(const uint64_t& index) const;
virtual tx_out_index get_output_tx_and_index_from_global(const uint64_t& index) const;
+ virtual void get_output_tx_and_index_from_global(const std::vector<uint64_t> &global_indices,
+ std::vector<tx_out_index> &tx_out_indices) const;
- virtual tx_out_index get_output_tx_and_index(const uint64_t& amount, const uint64_t& index) const;
- virtual void get_output_tx_and_index(const uint64_t& amount, std::vector<uint64_t> &offsets, std::vector<tx_out_index> &indices) const;
+ virtual tx_out_index get_output_tx_and_index(const uint64_t& amount, const uint64_t& index);
+ virtual void get_output_tx_and_index(const uint64_t& amount, const std::vector<uint64_t> &offsets, std::vector<tx_out_index> &indices);
virtual std::vector<uint64_t> get_tx_output_indices(const crypto::hash& h) const;
virtual std::vector<uint64_t> get_tx_amount_output_indices(const crypto::hash& h) const;
@@ -198,7 +341,12 @@ public:
virtual void batch_abort();
virtual void pop_block(block& blk, std::vector<transaction>& txs);
- virtual bool has_bulk_indices() const { return true; }
+
+#if defined(BDB_BULK_CAN_THREAD)
+ virtual bool can_thread_bulk_indices() const { return true; }
+#else
+ virtual bool can_thread_bulk_indices() const { return false; }
+#endif
private:
virtual void add_block( const block& blk
@@ -214,7 +362,7 @@ private:
virtual void remove_transaction_data(const crypto::hash& tx_hash, const transaction& tx);
- virtual void add_output(const crypto::hash& tx_hash, const tx_out& tx_output, const uint64_t& local_index);
+ virtual void add_output(const crypto::hash& tx_hash, const tx_out& tx_output, const uint64_t& local_index, const uint64_t unlock_time);
virtual void remove_output(const tx_out& tx_output);
@@ -227,6 +375,7 @@ private:
virtual void remove_spent_key(const crypto::key_image& k_image);
+ void get_output_global_indices(const uint64_t& amount, const std::vector<uint64_t> &offsets, std::vector<uint64_t> &global_indices);
/**
* @brief convert a tx output to a blob for storage
*
@@ -253,10 +402,13 @@ private:
*
* @return the global index of the desired output
*/
- uint64_t get_output_global_index(const uint64_t& amount, const uint64_t& index) const;
-
+ uint64_t get_output_global_index(const uint64_t& amount, const uint64_t& index);
+ void checkpoint_worker() const;
void check_open() const;
- void *m_buffer;
+ bool m_run_checkpoint;
+ std::unique_ptr<boost::thread> m_checkpoint_thread;
+ typedef bdb_safe_buffer<void *> bdb_safe_buffer_t;
+ bdb_safe_buffer_t m_buffer;
DbEnv* m_env;