diff options
author | warptangent <warptangent@inbox.com> | 2015-02-11 15:55:53 -0800 |
---|---|---|
committer | warptangent <warptangent@inbox.com> | 2015-02-23 00:33:37 -0800 |
commit | 58ecc58be13befb8b5d0e6b847987b2fd9635d2c (patch) | |
tree | a4ef9ca136af6865ef45d200ab98f32609e58386 | |
parent | Improve block and tx processing efficiency by less repeat hashing (diff) | |
download | monero-58ecc58be13befb8b5d0e6b847987b2fd9635d2c.tar.xz |
BlockchainLMDB: Add support for batch transactions
-rw-r--r-- | src/cryptonote_core/BlockchainDB_impl/db_lmdb.cpp | 294 | ||||
-rw-r--r-- | src/cryptonote_core/BlockchainDB_impl/db_lmdb.h | 50 |
2 files changed, 289 insertions, 55 deletions
diff --git a/src/cryptonote_core/BlockchainDB_impl/db_lmdb.cpp b/src/cryptonote_core/BlockchainDB_impl/db_lmdb.cpp index 849ec8f6d..c30a785ed 100644 --- a/src/cryptonote_core/BlockchainDB_impl/db_lmdb.cpp +++ b/src/cryptonote_core/BlockchainDB_impl/db_lmdb.cpp @@ -600,15 +600,23 @@ void BlockchainLMDB::check_open() const BlockchainLMDB::~BlockchainLMDB() { LOG_PRINT_L3("BlockchainLMDB::" << __func__); + + // batch transaction shouldn't be active at this point. If it is, consider it aborted. + if (m_batch_active) + batch_abort(); } -BlockchainLMDB::BlockchainLMDB() +BlockchainLMDB::BlockchainLMDB(bool batch_transactions) { LOG_PRINT_L3("BlockchainLMDB::" << __func__); // initialize folder to something "safe" just in case // someone accidentally misuses this class... m_folder = "thishsouldnotexistbecauseitisgibberish"; m_open = false; + + m_batch_transactions = batch_transactions; + m_write_txn = nullptr; + m_batch_active = false; m_height = 0; } @@ -720,6 +728,13 @@ void BlockchainLMDB::create(const std::string& filename) void BlockchainLMDB::close() { LOG_PRINT_L3("BlockchainLMDB::" << __func__); + if (m_batch_active) + { + LOG_PRINT_L3("close() first calling batch_abort() due to active batch transaction"); + batch_abort(); + } + this->sync(); + // FIXME: not yet thread safe!!! Use with care. mdb_env_close(m_env); } @@ -727,7 +742,13 @@ void BlockchainLMDB::close() void BlockchainLMDB::sync() { LOG_PRINT_L3("BlockchainLMDB::" << __func__); - // LMDB documentation leads me to believe this is unnecessary + + // Does nothing unless LMDB environment was opened with MDB_NOSYNC or in part + // MDB_NOMETASYNC. Force flush to be synchronous. + if (auto result = mdb_env_sync(m_env, true)) + { + throw0(DB_ERROR(std::string("Failed to sync database").append(mdb_strerror(result)).c_str())); + } } void BlockchainLMDB::reset() @@ -867,12 +888,17 @@ uint64_t BlockchainLMDB::get_block_timestamp(const uint64_t& height) const check_open(); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); - + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } MDB_val_copy<uint64_t> key(height); MDB_val result; - auto get_result = mdb_get(txn, m_block_timestamps, &key, &result); + auto get_result = mdb_get(*txn_ptr, m_block_timestamps, &key, &result); if (get_result == MDB_NOTFOUND) { throw0(DB_ERROR(std::string("Attempt to get timestamp from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- timestamp not in db").c_str())); @@ -880,7 +906,8 @@ uint64_t BlockchainLMDB::get_block_timestamp(const uint64_t& height) const else if (get_result) throw0(DB_ERROR("Error attempting to retrieve a timestamp from the db")); - txn.commit(); + if (! m_batch_active) + txn.commit(); return *(const uint64_t*)result.mv_data; } @@ -904,12 +931,18 @@ size_t BlockchainLMDB::get_block_size(const uint64_t& height) const check_open(); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } MDB_val_copy<uint64_t> key(height); MDB_val result; - auto get_result = mdb_get(txn, m_block_sizes, &key, &result); + auto get_result = mdb_get(*txn_ptr, m_block_sizes, &key, &result); if (get_result == MDB_NOTFOUND) { throw0(DB_ERROR(std::string("Attempt to get block size from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- block size not in db").c_str())); @@ -917,7 +950,8 @@ size_t BlockchainLMDB::get_block_size(const uint64_t& height) const else if (get_result) throw0(DB_ERROR("Error attempting to retrieve a block size from the db")); - txn.commit(); + if (! m_batch_active) + txn.commit(); return *(const size_t*)result.mv_data; } @@ -927,12 +961,17 @@ difficulty_type BlockchainLMDB::get_block_cumulative_difficulty(const uint64_t& check_open(); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); - + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } MDB_val_copy<uint64_t> key(height); MDB_val result; - auto get_result = mdb_get(txn, m_block_diffs, &key, &result); + auto get_result = mdb_get(*txn_ptr, m_block_diffs, &key, &result); if (get_result == MDB_NOTFOUND) { throw0(DB_ERROR(std::string("Attempt to get cumulative difficulty from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- difficulty not in db").c_str())); @@ -940,7 +979,8 @@ difficulty_type BlockchainLMDB::get_block_cumulative_difficulty(const uint64_t& else if (get_result) throw0(DB_ERROR("Error attempting to retrieve a cumulative difficulty from the db")); - txn.commit(); + if (! m_batch_active) + txn.commit(); return *(difficulty_type*)result.mv_data; } @@ -967,12 +1007,18 @@ uint64_t BlockchainLMDB::get_block_already_generated_coins(const uint64_t& heigh check_open(); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } MDB_val_copy<uint64_t> key(height); MDB_val result; - auto get_result = mdb_get(txn, m_block_coins, &key, &result); + auto get_result = mdb_get(*txn_ptr, m_block_coins, &key, &result); if (get_result == MDB_NOTFOUND) { throw0(DB_ERROR(std::string("Attempt to get generated coins from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- block size not in db").c_str())); @@ -980,7 +1026,8 @@ uint64_t BlockchainLMDB::get_block_already_generated_coins(const uint64_t& heigh else if (get_result) throw0(DB_ERROR("Error attempting to retrieve a total generated coins from the db")); - txn.commit(); + if (! m_batch_active) + txn.commit(); return *(const uint64_t*)result.mv_data; } @@ -990,20 +1037,28 @@ crypto::hash BlockchainLMDB::get_block_hash_from_height(const uint64_t& height) check_open(); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } MDB_val_copy<uint64_t> key(height); MDB_val result; - auto get_result = mdb_get(txn, m_block_hashes, &key, &result); + auto get_result = mdb_get(*txn_ptr, m_block_hashes, &key, &result); if (get_result == MDB_NOTFOUND) { throw0(BLOCK_DNE(std::string("Attempt to get hash from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- hash not in db").c_str())); } else if (get_result) - throw0(DB_ERROR("Error attempting to retrieve a block hash from the db")); + throw0(DB_ERROR(std::string("Error attempting to retrieve a block hash from the db: "). + append(mdb_strerror(get_result)).c_str())); - txn.commit(); + if (! m_batch_active) + txn.commit(); return *(crypto::hash*)result.mv_data; } @@ -1075,6 +1130,10 @@ bool BlockchainLMDB::tx_exists(const crypto::hash& h) const LOG_PRINT_L3("BlockchainLMDB::" << __func__); check_open(); + if (m_batch_active) + { + LOG_PRINT_L0("WARNING: active batch transaction while creating a read-only txn in tx_exists()"); + } txn_safe txn; if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) throw0(DB_ERROR("Failed to create a transaction for the db")); @@ -1120,12 +1179,18 @@ transaction BlockchainLMDB::get_tx(const crypto::hash& h) const check_open(); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } MDB_val_copy<crypto::hash> key(h); MDB_val result; - auto get_result = mdb_get(txn, m_txs, &key, &result); + auto get_result = mdb_get(*txn_ptr, m_txs, &key, &result); if (get_result == MDB_NOTFOUND) throw1(TX_DNE(std::string("tx with hash ").append(epee::string_tools::pod_to_hex(h)).append(" not found in db").c_str())); else if (get_result) @@ -1137,6 +1202,8 @@ transaction BlockchainLMDB::get_tx(const crypto::hash& h) const transaction tx; if (!parse_and_validate_tx_from_blob(bd, tx)) throw0(DB_ERROR("Failed to parse tx from blob retrieved from the db")); + if (! m_batch_active) + txn.commit(); return tx; } @@ -1178,13 +1245,27 @@ uint64_t BlockchainLMDB::get_tx_block_height(const crypto::hash& h) const LOG_PRINT_L3("BlockchainLMDB::" << __func__); check_open(); + // If m_batch_active is set, a batch transaction exists beyond this class, + // such as a batch import with verification enabled, or possibly (later) a + // batch network sync. + // + // A regular network sync without batching would be expected to open a new + // read transaction here, as validation is done prior to the write for block + // and tx data. + txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } MDB_val_copy<crypto::hash> key(h); MDB_val result; - auto get_result = mdb_get(txn, m_tx_heights, &key, &result); + auto get_result = mdb_get(*txn_ptr, m_tx_heights, &key, &result); if (get_result == MDB_NOTFOUND) { throw1(TX_DNE(std::string("tx height with hash ").append(epee::string_tools::pod_to_hex(h)).append(" not found in db").c_str())); @@ -1192,6 +1273,9 @@ uint64_t BlockchainLMDB::get_tx_block_height(const crypto::hash& h) const else if (get_result) throw0(DB_ERROR("DB error attempting to fetch tx height from hash")); + if (! m_batch_active) + txn.commit(); + return *(const uint64_t*)result.mv_data; } @@ -1334,13 +1418,18 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index_from_global(const uint64_t& check_open(); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); - + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } MDB_val_copy<uint64_t> k(index); MDB_val v; - auto get_result = mdb_get(txn, m_output_txs, &k, &v); + auto get_result = mdb_get(*txn_ptr, m_output_txs, &k, &v); if (get_result == MDB_NOTFOUND) throw1(OUTPUT_DNE("output with given index not in db")); else if (get_result) @@ -1348,11 +1437,13 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index_from_global(const uint64_t& crypto::hash tx_hash = *(crypto::hash*)v.mv_data; - get_result = mdb_get(txn, m_output_indices, &k, &v); + get_result = mdb_get(*txn_ptr, m_output_indices, &k, &v); if (get_result == MDB_NOTFOUND) throw1(OUTPUT_DNE("output with given index not in db")); else if (get_result) throw0(DB_ERROR("DB error attempting to fetch output tx index")); + if (! m_batch_active) + txn.commit(); return tx_out_index(tx_hash, *(const uint64_t *)v.mv_data); } @@ -1363,10 +1454,15 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index(const uint64_t& amount, con check_open(); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); - - lmdb_cur cur(txn, m_output_amounts); + txn_safe* txn_ptr = &txn; + if (m_batch_active) + txn_ptr = m_write_txn; + else + { + if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + } + lmdb_cur cur(*txn_ptr, m_output_amounts); MDB_val_copy<uint64_t> k(amount); MDB_val v; @@ -1395,7 +1491,8 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index(const uint64_t& amount, con cur.close(); - txn.commit(); + if (! m_batch_active) + txn.commit(); return get_output_tx_and_index_from_global(glob_index); } @@ -1538,6 +1635,85 @@ bool BlockchainLMDB::has_key_image(const crypto::key_image& img) const return false; } +void BlockchainLMDB::batch_start() +{ + LOG_PRINT_L3("BlockchainLMDB::" << __func__); + if (! m_batch_transactions) + throw0(DB_ERROR("batch transactions not enabled")); + if (m_batch_active) + throw0(DB_ERROR("batch transaction already in progress")); + if (m_write_txn) + throw0(DB_ERROR("batch transaction attempted, but m_write_txn already in use")); + check_open(); + // NOTE: need to make sure it's destroyed properly when done + if (mdb_txn_begin(m_env, NULL, 0, m_write_batch_txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + // indicates this transaction is for batch transactions, but not whether it's + // active + m_write_batch_txn.m_batch_txn = true; + m_write_txn = &m_write_batch_txn; + m_batch_active = true; + LOG_PRINT_L3("batch transaction: begin"); +} + +void BlockchainLMDB::batch_commit() +{ + LOG_PRINT_L3("BlockchainLMDB::" << __func__); + if (! m_batch_transactions) + throw0(DB_ERROR("batch transactions not enabled")); + if (! m_batch_active) + throw0(DB_ERROR("batch transaction not in progress")); + check_open(); + LOG_PRINT_L3("batch transaction: committing..."); + m_write_txn->commit(); + LOG_PRINT_L3("batch transaction: committed"); + + if (mdb_txn_begin(m_env, NULL, 0, m_write_batch_txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + if (! m_write_batch_txn.m_batch_txn) + throw0(DB_ERROR("m_write_batch_txn not marked as a batch transaction")); + m_write_txn = &m_write_batch_txn; +} + +void BlockchainLMDB::batch_stop() +{ + LOG_PRINT_L3("BlockchainLMDB::" << __func__); + if (! m_batch_transactions) + throw0(DB_ERROR("batch transactions not enabled")); + if (! m_batch_active) + throw0(DB_ERROR("batch transaction not in progress")); + check_open(); + LOG_PRINT_L3("batch transaction: committing..."); + m_write_txn->commit(); + // for destruction of batch transaction + m_write_txn = nullptr; + m_batch_active = false; + LOG_PRINT_L3("batch transaction: end"); +} + +void BlockchainLMDB::batch_abort() +{ + LOG_PRINT_L3("BlockchainLMDB::" << __func__); + if (! m_batch_transactions) + throw0(DB_ERROR("batch transactions not enabled")); + if (! m_batch_active) + throw0(DB_ERROR("batch transaction not in progress")); + check_open(); + // for destruction of batch transaction + m_write_txn = nullptr; + // explicitly call in case mdb_env_close() (BlockchainLMDB::close()) called before BlockchainLMDB destructor called. + m_write_batch_txn.abort(); + m_batch_active = false; + LOG_PRINT_L3("batch transaction: aborted"); +} + +void BlockchainLMDB::set_batch_transactions(bool batch_transactions) +{ + LOG_PRINT_L3("BlockchainLMDB::" << __func__); + m_batch_transactions = batch_transactions; + LOG_PRINT_L3("batch transactions " << (m_batch_transactions ? "enabled" : "disabled")); +} + uint64_t BlockchainLMDB::add_block( const block& blk , const size_t& block_size , const difficulty_type& cumulative_difficulty @@ -1547,23 +1723,31 @@ uint64_t BlockchainLMDB::add_block( const block& blk { LOG_PRINT_L3("BlockchainLMDB::" << __func__); check_open(); + txn_safe txn; - if (mdb_txn_begin(m_env, NULL, 0, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); - m_write_txn = &txn; + if (! m_batch_active) + { + if (mdb_txn_begin(m_env, NULL, 0, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + m_write_txn = &txn; + } uint64_t num_outputs = m_num_outputs; try { BlockchainDB::add_block(blk, block_size, cumulative_difficulty, coins_generated, txs); - m_write_txn = NULL; + if (! m_batch_active) + { + m_write_txn = NULL; - txn.commit(); + txn.commit(); + } } catch (...) { m_num_outputs = num_outputs; - m_write_txn = NULL; + if (! m_batch_active) + m_write_txn = NULL; throw; } @@ -1574,17 +1758,23 @@ void BlockchainLMDB::pop_block(block& blk, std::vector<transaction>& txs) { LOG_PRINT_L3("BlockchainLMDB::" << __func__); txn_safe txn; - if (mdb_txn_begin(m_env, NULL, 0, txn)) - throw0(DB_ERROR("Failed to create a transaction for the db")); - m_write_txn = &txn; + if (! m_batch_active) + { + if (mdb_txn_begin(m_env, NULL, 0, txn)) + throw0(DB_ERROR("Failed to create a transaction for the db")); + m_write_txn = &txn; + } uint64_t num_outputs = m_num_outputs; try { BlockchainDB::pop_block(blk, txs); - m_write_txn = NULL; + if (! m_batch_active) + { + m_write_txn = NULL; - txn.commit(); + txn.commit(); + } } catch (...) { diff --git a/src/cryptonote_core/BlockchainDB_impl/db_lmdb.h b/src/cryptonote_core/BlockchainDB_impl/db_lmdb.h index 29fc52fbd..0b4b5ec1a 100644 --- a/src/cryptonote_core/BlockchainDB_impl/db_lmdb.h +++ b/src/cryptonote_core/BlockchainDB_impl/db_lmdb.h @@ -38,8 +38,23 @@ struct txn_safe txn_safe() : m_txn(NULL) { } ~txn_safe() { - if(m_txn != NULL) + LOG_PRINT_L3("txn_safe: destructor"); + if (m_txn != NULL) { + if (m_batch_txn) // this is a batch txn and should have been handled before this point for safety + { + LOG_PRINT_L0("WARNING: txn_safe: m_txn is a batch txn and it's not NULL in destructor - calling mdb_txn_abort()"); + } + else + { + // Example of when this occurs: a lookup fails, so a read-only txn is + // aborted through this destructor. However, successful read-only txns + // ideally should have been committed when done and not end up here. + // + // NOTE: not sure if this is ever reached for a non-batch write + // transaction, but it's probably not ideal if it did. + LOG_PRINT_L3("txn_safe: m_txn not NULL in destructor - calling mdb_txn_abort()"); + } mdb_txn_abort(m_txn); } } @@ -60,6 +75,24 @@ struct txn_safe m_txn = NULL; } + // This should only be needed for batch transaction which must be ensured to + // be aborted before mdb_env_close, not after. So we can't rely on + // BlockchainLMDB destructor to call txn_safe destructor, as that's too late + // to properly abort, since mdb_env_close would have been called earlier. + void abort() + { + LOG_PRINT_L3("txn_safe: abort()"); + if(m_txn != NULL) + { + mdb_txn_abort(m_txn); + m_txn = NULL; + } + else + { + LOG_PRINT_L0("WARNING: txn_safe: abort() called, but m_txn is NULL"); + } + } + operator MDB_txn*() { return m_txn; @@ -71,13 +104,14 @@ struct txn_safe } MDB_txn* m_txn; + bool m_batch_txn = false; }; class BlockchainLMDB : public BlockchainDB { public: - BlockchainLMDB(); + BlockchainLMDB(bool batch_transactions=false); ~BlockchainLMDB(); virtual void open(const std::string& filename); @@ -177,6 +211,12 @@ public: , const std::vector<transaction>& txs ); + virtual void set_batch_transactions(bool batch_transactions); + virtual void batch_start(); + virtual void batch_commit(); + virtual void batch_stop(); + virtual void batch_abort(); + virtual void pop_block(block& blk, std::vector<transaction>& txs); private: @@ -264,7 +304,11 @@ private: uint64_t m_height; uint64_t m_num_outputs; std::string m_folder; - txn_safe* m_write_txn; + txn_safe* m_write_txn; // may point to either a short-lived txn or a batch txn + txn_safe m_write_batch_txn; // persist batch txn outside of BlockchainLMDB + + bool m_batch_transactions; // support for batch transactions + bool m_batch_active; // whether batch transaction is in progress }; } // namespace cryptonote |