diff options
Diffstat (limited to '')
-rw-r--r-- | src/rpc/zmq_pub.cpp | 99 |
1 files changed, 85 insertions, 14 deletions
diff --git a/src/rpc/zmq_pub.cpp b/src/rpc/zmq_pub.cpp index 0dffffac6..f45f70e04 100644 --- a/src/rpc/zmq_pub.cpp +++ b/src/rpc/zmq_pub.cpp @@ -48,6 +48,8 @@ #include "cryptonote_basic/events.h" #include "misc_log_ex.h" #include "serialization/json_object.h" +#include "ringct/rctTypes.h" +#include "cryptonote_core/cryptonote_tx_utils.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net.zmq" @@ -57,6 +59,7 @@ namespace constexpr const char txpool_signal[] = "tx_signal"; using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span<const cryptonote::block>); + using miner_writer = void(epee::byte_stream&, uint8_t, uint64_t, const crypto::hash&, const crypto::hash&, cryptonote::difficulty_type, uint64_t, uint64_t, const std::vector<cryptonote::tx_block_template_backlog_entry>&); using txpool_writer = void(epee::byte_stream&, epee::span<const cryptonote::txpool_event>); template<typename F> @@ -116,13 +119,30 @@ namespace const epee::span<const cryptonote::block> blocks; }; + //! Object for miner data serialization + struct miner_data + { + uint8_t major_version; + uint64_t height; + const crypto::hash& prev_id; + const crypto::hash& seed_hash; + cryptonote::difficulty_type diff; + uint64_t median_weight; + uint64_t already_generated_coins; + const std::vector<cryptonote::tx_block_template_backlog_entry>& tx_backlog; + }; + //! Object for "minimal" tx serialization struct minimal_txpool { const cryptonote::transaction& tx; + crypto::hash hash; + uint64_t blob_size; + uint64_t weight; + uint64_t fee; }; - void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_chain self) + void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_chain& self) { namespace adapt = boost::adaptors; @@ -143,19 +163,27 @@ namespace dest.EndObject(); } - void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_txpool self) + void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const miner_data& self) { - crypto::hash id{}; - std::size_t blob_size = 0; - if (!get_transaction_hash(self.tx, id, blob_size)) - { - MERROR("ZMQ/Pub failure: get_transaction_hash"); - return; - } + dest.StartObject(); + INSERT_INTO_JSON_OBJECT(dest, major_version, self.major_version); + INSERT_INTO_JSON_OBJECT(dest, height, self.height); + INSERT_INTO_JSON_OBJECT(dest, prev_id, self.prev_id); + INSERT_INTO_JSON_OBJECT(dest, seed_hash, self.seed_hash); + INSERT_INTO_JSON_OBJECT(dest, difficulty, cryptonote::hex(self.diff)); + INSERT_INTO_JSON_OBJECT(dest, median_weight, self.median_weight); + INSERT_INTO_JSON_OBJECT(dest, already_generated_coins, self.already_generated_coins); + INSERT_INTO_JSON_OBJECT(dest, tx_backlog, self.tx_backlog); + dest.EndObject(); + } + void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_txpool& self) + { dest.StartObject(); - INSERT_INTO_JSON_OBJECT(dest, id, id); - INSERT_INTO_JSON_OBJECT(dest, blob_size, blob_size); + INSERT_INTO_JSON_OBJECT(dest, id, self.hash); + INSERT_INTO_JSON_OBJECT(dest, blob_size, self.blob_size); + INSERT_INTO_JSON_OBJECT(dest, weight, self.weight); + INSERT_INTO_JSON_OBJECT(dest, fee, self.fee); dest.EndObject(); } @@ -169,6 +197,11 @@ namespace json_pub(buf, minimal_chain{height, blocks}); } + void json_miner_data(epee::byte_stream& buf, uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, cryptonote::difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector<cryptonote::tx_block_template_backlog_entry>& tx_backlog) + { + json_pub(buf, miner_data{major_version, height, prev_id, seed_hash, diff, median_weight, already_generated_coins, tx_backlog}); + } + // boost::adaptors are in place "views" - no copy/move takes place // moving transactions (via sort, etc.), is expensive! @@ -187,7 +220,7 @@ namespace namespace adapt = boost::adaptors; const auto to_minimal_tx = [](const cryptonote::txpool_event& event) { - return minimal_txpool{event.tx}; + return minimal_txpool{event.tx, event.hash, event.blob_size, event.weight, cryptonote::get_tx_fee(event.tx)}; }; json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_minimal_tx))); } @@ -198,6 +231,11 @@ namespace {u8"json-minimal-chain_main", json_minimal_chain} }}; + constexpr const std::array<context<miner_writer>, 1> miner_contexts = + {{ + {u8"json-full-miner_data", json_miner_data}, + }}; + constexpr const std::array<context<txpool_writer>, 2> txpool_contexts = {{ {u8"json-full-txpool_add", json_full_txpool}, @@ -321,6 +359,7 @@ namespace cryptonote { namespace listener zmq_pub::zmq_pub(void* context) : relay_(), chain_subs_{{0}}, + miner_subs_{{0}}, txpool_subs_{{0}}, sync_() { @@ -328,6 +367,7 @@ zmq_pub::zmq_pub(void* context) throw std::logic_error{"ZMQ context cannot be NULL"}; verify_sorted(chain_contexts, "chain_contexts"); + verify_sorted(miner_contexts, "miner_contexts"); verify_sorted(txpool_contexts, "txpool_contexts"); relay_.reset(zmq_socket(context, ZMQ_PAIR)); @@ -348,22 +388,25 @@ bool zmq_pub::sub_request(boost::string_ref message) message.remove_prefix(1); const auto chain_range = get_range(chain_contexts, message); + const auto miner_range = get_range(miner_contexts, message); const auto txpool_range = get_range(txpool_contexts, message); - if (!chain_range.empty() || !txpool_range.empty()) + if (!chain_range.empty() || !miner_range.empty() || !txpool_range.empty()) { MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " << - chain_range.size() << " chain topic(s) and " << txpool_range.size() << " txpool topic(s)"); + chain_range.size() << " chain topic(s), " << miner_range.size() << " miner topic(s) and " << txpool_range.size() << " txpool topic(s)"); const boost::lock_guard<boost::mutex> lock{sync_}; switch (tag) { case 0: remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin()); + remove_subscriptions(miner_subs_, miner_range, miner_contexts.begin()); remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin()); return true; case 1: add_subscriptions(chain_subs_, chain_range, chain_contexts.begin()); + add_subscriptions(miner_subs_, miner_range, miner_contexts.begin()); add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin()); return true; default: @@ -436,6 +479,25 @@ std::size_t zmq_pub::send_chain_main(const std::uint64_t height, const epee::spa return 0; } +std::size_t zmq_pub::send_miner_data(uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector<tx_block_template_backlog_entry>& tx_backlog) +{ + boost::unique_lock<boost::mutex> guard{sync_}; + + const auto subs_copy = miner_subs_; + guard.unlock(); + + for (const std::size_t sub : subs_copy) + { + if (sub) + { + auto messages = make_pubs(subs_copy, miner_contexts, major_version, height, prev_id, seed_hash, diff, median_weight, already_generated_coins, tx_backlog); + guard.lock(); + return send_messages(relay_.get(), messages); + } + } + return 0; +} + std::size_t zmq_pub::send_txpool_add(std::vector<txpool_event> txes) { if (txes.empty()) @@ -466,6 +528,15 @@ void zmq_pub::chain_main::operator()(const std::uint64_t height, epee::span<cons MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed"); } +void zmq_pub::miner_data::operator()(uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector<tx_block_template_backlog_entry>& tx_backlog) const +{ + const std::shared_ptr<zmq_pub> self = self_.lock(); + if (self) + self->send_miner_data(major_version, height, prev_id, seed_hash, diff, median_weight, already_generated_coins, tx_backlog); + else + MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed"); +} + void zmq_pub::txpool_add::operator()(std::vector<cryptonote::txpool_event> txes) const { const std::shared_ptr<zmq_pub> self = self_.lock(); |