aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlexander Blair <snipa@jagtech.io>2020-08-05 07:54:14 -0700
committerAlexander Blair <snipa@jagtech.io>2020-08-05 07:54:31 -0700
commitd9deb2c2fe66010bfb7978ae17c7d278143b546d (patch)
tree75a052bb95f6087421c8fedde549d6930c5af847 /src/rpc
parentMerge pull request #6586 (diff)
parentAdding ZMQ/Pub support for txpool_add and chain_main events (diff)
downloadmonero-d9deb2c2fe66010bfb7978ae17c7d278143b546d.tar.xz
Merge pull request #6418
e5214a2ca Adding ZMQ/Pub support for txpool_add and chain_main events (Lee Clagett)
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/CMakeLists.txt22
-rw-r--r--src/rpc/fwd.h37
-rw-r--r--src/rpc/zmq_pub.cpp478
-rw-r--r--src/rpc/zmq_pub.h110
-rw-r--r--src/rpc/zmq_server.cpp171
-rw-r--r--src/rpc/zmq_server.h23
6 files changed, 788 insertions, 53 deletions
diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt
index 35195bd98..19298c969 100644
--- a/src/rpc/CMakeLists.txt
+++ b/src/rpc/CMakeLists.txt
@@ -45,8 +45,11 @@ set(daemon_messages_sources
message.cpp
daemon_messages.cpp)
+set(rpc_pub_sources zmq_pub.cpp)
+
set(daemon_rpc_server_sources
daemon_handler.cpp
+ zmq_pub.cpp
zmq_server.cpp)
@@ -59,8 +62,9 @@ set(rpc_headers
rpc_version_str.h
rpc_handler.h)
-set(daemon_rpc_server_headers)
+set(rpc_pub_headers zmq_pub.h)
+set(daemon_rpc_server_headers)
set(rpc_daemon_private_headers
bootstrap_daemon.h
@@ -83,6 +87,8 @@ set(daemon_rpc_server_private_headers
monero_private_headers(rpc
${rpc_private_headers})
+set(rpc_pub_private_headers)
+
monero_private_headers(daemon_rpc_server
${daemon_rpc_server_private_headers})
@@ -97,6 +103,11 @@ monero_add_library(rpc
${rpc_headers}
${rpc_private_headers})
+monero_add_library(rpc_pub
+ ${rpc_pub_sources}
+ ${rpc_pub_headers}
+ ${rpc_pub_private_headers})
+
monero_add_library(daemon_messages
${daemon_messages_sources}
${daemon_messages_headers}
@@ -131,6 +142,14 @@ target_link_libraries(rpc
PRIVATE
${EXTRA_LIBRARIES})
+target_link_libraries(rpc_pub
+ PUBLIC
+ epee
+ net
+ cryptonote_basic
+ serialization
+ ${Boost_THREAD_LIBRARY})
+
target_link_libraries(daemon_messages
LINK_PRIVATE
cryptonote_core
@@ -142,6 +161,7 @@ target_link_libraries(daemon_messages
target_link_libraries(daemon_rpc_server
LINK_PRIVATE
rpc
+ rpc_pub
cryptonote_core
cryptonote_protocol
version
diff --git a/src/rpc/fwd.h b/src/rpc/fwd.h
new file mode 100644
index 000000000..72537f5a5
--- /dev/null
+++ b/src/rpc/fwd.h
@@ -0,0 +1,37 @@
+// Copyright (c) 2019-2020, The Monero Project
+//
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification, are
+// permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of
+// conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list
+// of conditions and the following disclaimer in the documentation and/or other
+// materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be
+// used to endorse or promote products derived from this software without specific
+// prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#pragma once
+
+namespace cryptonote
+{
+ namespace listener
+ {
+ class zmq_pub;
+ }
+}
diff --git a/src/rpc/zmq_pub.cpp b/src/rpc/zmq_pub.cpp
new file mode 100644
index 000000000..0dffffac6
--- /dev/null
+++ b/src/rpc/zmq_pub.cpp
@@ -0,0 +1,478 @@
+// Copyright (c) 2020, The Monero Project
+//
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification, are
+// permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of
+// conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list
+// of conditions and the following disclaimer in the documentation and/or other
+// materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be
+// used to endorse or promote products derived from this software without specific
+// prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#include "zmq_pub.h"
+
+#include <algorithm>
+#include <boost/range/adaptor/filtered.hpp>
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/thread/locks.hpp>
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <rapidjson/document.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
+#include <stdexcept>
+#include <string>
+#include <utility>
+
+#include "common/expect.h"
+#include "crypto/crypto.h"
+#include "cryptonote_basic/cryptonote_format_utils.h"
+#include "cryptonote_basic/events.h"
+#include "misc_log_ex.h"
+#include "serialization/json_object.h"
+
+#undef MONERO_DEFAULT_LOG_CATEGORY
+#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq"
+
+namespace
+{
+ constexpr const char txpool_signal[] = "tx_signal";
+
+ using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span<const cryptonote::block>);
+ using txpool_writer = void(epee::byte_stream&, epee::span<const cryptonote::txpool_event>);
+
+ template<typename F>
+ struct context
+ {
+ char const* const name;
+ F* generate_pub;
+ };
+
+ template<typename T>
+ bool operator<(const context<T>& lhs, const context<T>& rhs) noexcept
+ {
+ return std::strcmp(lhs.name, rhs.name) < 0;
+ }
+
+ template<typename T>
+ bool operator<(const context<T>& lhs, const boost::string_ref rhs) noexcept
+ {
+ return lhs.name < rhs;
+ }
+
+ struct is_valid
+ {
+ bool operator()(const cryptonote::txpool_event& event) const noexcept
+ {
+ return event.res;
+ }
+ };
+
+ template<typename T, std::size_t N>
+ void verify_sorted(const std::array<context<T>, N>& elems, const char* name)
+ {
+ auto unsorted = std::is_sorted_until(elems.begin(), elems.end());
+ if (unsorted != elems.end())
+ throw std::logic_error{name + std::string{" array is not properly sorted, see: "} + unsorted->name};
+ }
+
+ void write_header(epee::byte_stream& buf, const boost::string_ref name)
+ {
+ buf.write(name.data(), name.size());
+ buf.put(':');
+ }
+
+ //! \return `name:...` where `...` is JSON and `name` is directly copied (no quotes - not JSON).
+ template<typename T>
+ void json_pub(epee::byte_stream& buf, const T value)
+ {
+ rapidjson::Writer<epee::byte_stream> dest{buf};
+ using cryptonote::json::toJsonValue;
+ toJsonValue(dest, value);
+ }
+
+ //! Object for "minimal" block serialization
+ struct minimal_chain
+ {
+ const std::uint64_t height;
+ const epee::span<const cryptonote::block> blocks;
+ };
+
+ //! Object for "minimal" tx serialization
+ struct minimal_txpool
+ {
+ const cryptonote::transaction& tx;
+ };
+
+ void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_chain self)
+ {
+ namespace adapt = boost::adaptors;
+
+ const auto to_block_id = [](const cryptonote::block& bl)
+ {
+ crypto::hash id;
+ if (!get_block_hash(bl, id))
+ MERROR("ZMQ/Pub failure: get_block_hash");
+ return id;
+ };
+
+ assert(!self.blocks.empty()); // checked in zmq_pub::send_chain_main
+
+ dest.StartObject();
+ INSERT_INTO_JSON_OBJECT(dest, first_height, self.height);
+ INSERT_INTO_JSON_OBJECT(dest, first_prev_id, self.blocks[0].prev_id);
+ INSERT_INTO_JSON_OBJECT(dest, ids, (self.blocks | adapt::transformed(to_block_id)));
+ dest.EndObject();
+ }
+
+ void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_txpool self)
+ {
+ crypto::hash id{};
+ std::size_t blob_size = 0;
+ if (!get_transaction_hash(self.tx, id, blob_size))
+ {
+ MERROR("ZMQ/Pub failure: get_transaction_hash");
+ return;
+ }
+
+ dest.StartObject();
+ INSERT_INTO_JSON_OBJECT(dest, id, id);
+ INSERT_INTO_JSON_OBJECT(dest, blob_size, blob_size);
+ dest.EndObject();
+ }
+
+ void json_full_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
+ {
+ json_pub(buf, blocks);
+ }
+
+ void json_minimal_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
+ {
+ json_pub(buf, minimal_chain{height, blocks});
+ }
+
+ // boost::adaptors are in place "views" - no copy/move takes place
+ // moving transactions (via sort, etc.), is expensive!
+
+ void json_full_txpool(epee::byte_stream& buf, epee::span<const cryptonote::txpool_event> txes)
+ {
+ namespace adapt = boost::adaptors;
+ const auto to_full_tx = [](const cryptonote::txpool_event& event)
+ {
+ return event.tx;
+ };
+ json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_full_tx)));
+ }
+
+ void json_minimal_txpool(epee::byte_stream& buf, epee::span<const cryptonote::txpool_event> txes)
+ {
+ namespace adapt = boost::adaptors;
+ const auto to_minimal_tx = [](const cryptonote::txpool_event& event)
+ {
+ return minimal_txpool{event.tx};
+ };
+ json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_minimal_tx)));
+ }
+
+ constexpr const std::array<context<chain_writer>, 2> chain_contexts =
+ {{
+ {u8"json-full-chain_main", json_full_chain},
+ {u8"json-minimal-chain_main", json_minimal_chain}
+ }};
+
+ constexpr const std::array<context<txpool_writer>, 2> txpool_contexts =
+ {{
+ {u8"json-full-txpool_add", json_full_txpool},
+ {u8"json-minimal-txpool_add", json_minimal_txpool}
+ }};
+
+ template<typename T, std::size_t N>
+ epee::span<const context<T>> get_range(const std::array<context<T>, N>& contexts, const boost::string_ref value)
+ {
+ const auto not_prefix = [](const boost::string_ref lhs, const context<T>& rhs)
+ {
+ return !(boost::string_ref{rhs.name}.starts_with(lhs));
+ };
+
+ const auto lower = std::lower_bound(contexts.begin(), contexts.end(), value);
+ const auto upper = std::upper_bound(lower, contexts.end(), value, not_prefix);
+ return {lower, std::size_t(upper - lower)};
+ }
+
+ template<std::size_t N, typename T>
+ void add_subscriptions(std::array<std::size_t, N>& subs, const epee::span<const context<T>> range, context<T> const* const first)
+ {
+ assert(range.size() <= N);
+ assert(range.begin() - first <= N - range.size());
+
+ for (const auto& ctx : range)
+ {
+ const std::size_t i = std::addressof(ctx) - first;
+ subs[i] = std::min(std::numeric_limits<std::size_t>::max() - 1, subs[i]) + 1;
+ }
+ }
+
+ template<std::size_t N, typename T>
+ void remove_subscriptions(std::array<std::size_t, N>& subs, const epee::span<const context<T>> range, context<T> const* const first)
+ {
+ assert(range.size() <= N);
+ assert(range.begin() - first <= N - range.size());
+
+ for (const auto& ctx : range)
+ {
+ const std::size_t i = std::addressof(ctx) - first;
+ subs[i] = std::max(std::size_t(1), subs[i]) - 1;
+ }
+ }
+
+ template<std::size_t N, typename T, typename... U>
+ std::array<epee::byte_slice, N> make_pubs(const std::array<std::size_t, N>& subs, const std::array<context<T>, N>& contexts, U&&... args)
+ {
+ epee::byte_stream buf{};
+
+ std::size_t last_offset = 0;
+ std::array<std::size_t, N> offsets{{}};
+ for (std::size_t i = 0; i < N; ++i)
+ {
+ if (subs[i])
+ {
+ write_header(buf, contexts[i].name);
+ contexts[i].generate_pub(buf, std::forward<U>(args)...);
+ offsets[i] = buf.size() - last_offset;
+ last_offset = buf.size();
+ }
+ }
+
+ epee::byte_slice bytes{std::move(buf)};
+ std::array<epee::byte_slice, N> out;
+ for (std::size_t i = 0; i < N; ++i)
+ out[i] = bytes.take_slice(offsets[i]);
+
+ return out;
+ }
+
+ template<std::size_t N>
+ std::size_t send_messages(void* const socket, std::array<epee::byte_slice, N>& messages)
+ {
+ std::size_t count = 0;
+ for (epee::byte_slice& message : messages)
+ {
+ if (!message.empty())
+ {
+ const expect<void> sent = net::zmq::send(std::move(message), socket, ZMQ_DONTWAIT);
+ if (!sent)
+ MERROR("Failed to send ZMQ/Pub message: " << sent.error().message());
+ else
+ ++count;
+ }
+ }
+ return count;
+ }
+
+ expect<bool> relay_block_pub(void* const relay, void* const pub) noexcept
+ {
+ zmq_msg_t msg;
+ zmq_msg_init(std::addressof(msg));
+ MONERO_CHECK(net::zmq::retry_op(zmq_msg_recv, std::addressof(msg), relay, ZMQ_DONTWAIT));
+
+ const boost::string_ref payload{
+ reinterpret_cast<const char*>(zmq_msg_data(std::addressof(msg))),
+ zmq_msg_size(std::addressof(msg))
+ };
+
+ if (payload == txpool_signal)
+ {
+ zmq_msg_close(std::addressof(msg));
+ return false;
+ }
+
+ // forward block messages (serialized on P2P thread for now)
+ const expect<void> sent = net::zmq::retry_op(zmq_msg_send, std::addressof(msg), pub, ZMQ_DONTWAIT);
+ if (!sent)
+ {
+ zmq_msg_close(std::addressof(msg));
+ return sent.error();
+ }
+ return true;
+ }
+} // anonymous
+
+namespace cryptonote { namespace listener
+{
+
+zmq_pub::zmq_pub(void* context)
+ : relay_(),
+ chain_subs_{{0}},
+ txpool_subs_{{0}},
+ sync_()
+{
+ if (!context)
+ throw std::logic_error{"ZMQ context cannot be NULL"};
+
+ verify_sorted(chain_contexts, "chain_contexts");
+ verify_sorted(txpool_contexts, "txpool_contexts");
+
+ relay_.reset(zmq_socket(context, ZMQ_PAIR));
+ if (!relay_)
+ MONERO_ZMQ_THROW("Failed to create relay socket");
+ if (zmq_connect(relay_.get(), relay_endpoint()) != 0)
+ MONERO_ZMQ_THROW("Failed to connect relay socket");
+}
+
+zmq_pub::~zmq_pub()
+{}
+
+bool zmq_pub::sub_request(boost::string_ref message)
+{
+ if (!message.empty())
+ {
+ const char tag = message[0];
+ message.remove_prefix(1);
+
+ const auto chain_range = get_range(chain_contexts, message);
+ const auto txpool_range = get_range(txpool_contexts, message);
+
+ if (!chain_range.empty() || !txpool_range.empty())
+ {
+ MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " <<
+ chain_range.size() << " chain topic(s) and " << txpool_range.size() << " txpool topic(s)");
+
+ const boost::lock_guard<boost::mutex> lock{sync_};
+ switch (tag)
+ {
+ case 0:
+ remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
+ remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
+ return true;
+ case 1:
+ add_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
+ add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
+ return true;
+ default:
+ break;
+ }
+ }
+ }
+ MERROR("Invalid ZMQ/Sub message");
+ return false;
+}
+
+bool zmq_pub::relay_to_pub(void* const relay, void* const pub)
+{
+ const expect<bool> relayed = relay_block_pub(relay, pub);
+ if (!relayed)
+ {
+ MERROR("Error relaying ZMQ/Pub: " << relayed.error().message());
+ return false;
+ }
+
+ if (!*relayed)
+ {
+ std::array<std::size_t, 2> subs;
+ std::vector<cryptonote::txpool_event> events;
+ {
+ const boost::lock_guard<boost::mutex> lock{sync_};
+ if (txes_.empty())
+ return false;
+
+ subs = txpool_subs_;
+ events = std::move(txes_.front());
+ txes_.pop_front();
+ }
+ auto messages = make_pubs(subs, txpool_contexts, epee::to_span(events));
+ send_messages(pub, messages);
+ MDEBUG("Sent txpool ZMQ/Pub");
+ }
+ else
+ MDEBUG("Sent chain_main ZMQ/Pub");
+
+ return true;
+}
+
+std::size_t zmq_pub::send_chain_main(const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
+{
+ if (blocks.empty())
+ return 0;
+
+ /* Block format only sends one block at a time - multiple block notifications
+ are less common and only occur on rollbacks. */
+
+ boost::unique_lock<boost::mutex> guard{sync_};
+
+ const auto subs_copy = chain_subs_;
+ guard.unlock();
+
+ for (const std::size_t sub : subs_copy)
+ {
+ if (sub)
+ {
+ /* cryptonote_core/blockchain.cpp cannot "give" us the block like core
+ does for txpool events. Since copying the block is expensive anyway,
+ serialization is done right here on the p2p thread (for now). */
+
+ auto messages = make_pubs(subs_copy, chain_contexts, height, blocks);
+ guard.lock();
+ return send_messages(relay_.get(), messages);
+ }
+ }
+ return 0;
+}
+
+std::size_t zmq_pub::send_txpool_add(std::vector<txpool_event> txes)
+{
+ if (txes.empty())
+ return 0;
+
+ const boost::lock_guard<boost::mutex> lock{sync_};
+ for (const std::size_t sub : txpool_subs_)
+ {
+ if (sub)
+ {
+ const expect<void> sent = net::zmq::retry_op(zmq_send_const, relay_.get(), txpool_signal, sizeof(txpool_signal) - 1, ZMQ_DONTWAIT);
+ if (sent)
+ txes_.emplace_back(std::move(txes));
+ else
+ MERROR("ZMQ/Pub failure, relay queue error: " << sent.error().message());
+ return bool(sent);
+ }
+ }
+ return 0;
+}
+
+void zmq_pub::chain_main::operator()(const std::uint64_t height, epee::span<const cryptonote::block> blocks) const
+{
+ const std::shared_ptr<zmq_pub> self = self_.lock();
+ if (self)
+ self->send_chain_main(height, blocks);
+ else
+ MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
+}
+
+void zmq_pub::txpool_add::operator()(std::vector<cryptonote::txpool_event> txes) const
+{
+ const std::shared_ptr<zmq_pub> self = self_.lock();
+ if (self)
+ self->send_txpool_add(std::move(txes));
+ else
+ MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
+}
+
+}}
diff --git a/src/rpc/zmq_pub.h b/src/rpc/zmq_pub.h
new file mode 100644
index 000000000..02e6b8103
--- /dev/null
+++ b/src/rpc/zmq_pub.h
@@ -0,0 +1,110 @@
+// Copyright (c) 2020, The Monero Project
+//
+// All rights reserved.
+ //
+// Redistribution and use in source and binary forms, with or without modification, are
+// permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of
+// conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list
+// of conditions and the following disclaimer in the documentation and/or other
+// materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be
+// used to endorse or promote products derived from this software without specific
+// prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#pragma once
+
+#include <array>
+#include <boost/thread/mutex.hpp>
+#include <boost/utility/string_ref.hpp>
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <vector>
+
+#include "cryptonote_basic/fwd.h"
+#include "net/zmq.h"
+#include "span.h"
+
+namespace cryptonote { namespace listener
+{
+/*! \brief Sends ZMQ PUB messages on cryptonote events
+
+ Clients must ensure that all transaction(s) are notified before any blocks
+ they are contained in, and must ensure that each block is notified in chain
+ order. An external lock **must** be held by clients during the entire
+ txpool check and notification sequence and (a possibly second) lock is held
+ during the entire block check and notification sequence. Otherwise, events
+ could be sent in a different order than processed. */
+class zmq_pub
+{
+ /* Each socket has its own internal queue. So we can only use one socket, else
+ the messages being published are not guaranteed to be in the same order
+ pushed. */
+
+ net::zmq::socket relay_;
+ std::deque<std::vector<txpool_event>> txes_;
+ std::array<std::size_t, 2> chain_subs_;
+ std::array<std::size_t, 2> txpool_subs_;
+ boost::mutex sync_; //!< Synchronizes counts in `*_subs_` arrays.
+
+ public:
+ //! \return Name of ZMQ_PAIR endpoint for pub notifications
+ static constexpr const char* relay_endpoint() noexcept { return "inproc://pub_relay"; }
+
+ explicit zmq_pub(void* context);
+
+ zmq_pub(const zmq_pub&) = delete;
+ zmq_pub(zmq_pub&&) = delete;
+
+ ~zmq_pub();
+
+ zmq_pub& operator=(const zmq_pub&) = delete;
+ zmq_pub& operator=(zmq_pub&&) = delete;
+
+ //! Process a client subscription request (from XPUB sockets). Thread-safe.
+ bool sub_request(const boost::string_ref message);
+
+ /*! Forward ZMQ messages sent to `relay` via `send_chain_main` or
+ `send_txpool_add` to `pub`. Used by `ZmqServer`. */
+ bool relay_to_pub(void* relay, void* pub);
+
+ /*! Send a `ZMQ_PUB` notification for a change to the main chain.
+ Thread-safe.
+ \return Number of ZMQ messages sent to relay. */
+ std::size_t send_chain_main(std::uint64_t height, epee::span<const cryptonote::block> blocks);
+
+ /*! Send a `ZMQ_PUB` notification for new tx(es) being added to the local
+ pool. Thread-safe.
+ \return Number of ZMQ messages sent to relay. */
+ std::size_t send_txpool_add(std::vector<cryptonote::txpool_event> txes);
+
+ //! Callable for `send_chain_main` with weak ownership to `zmq_pub` object.
+ struct chain_main
+ {
+ std::weak_ptr<zmq_pub> self_;
+ void operator()(std::uint64_t height, epee::span<const cryptonote::block> blocks) const;
+ };
+
+ //! Callable for `send_txpool_add` with weak ownership to `zmq_pub` object.
+ struct txpool_add
+ {
+ std::weak_ptr<zmq_pub> self_;
+ void operator()(std::vector<cryptonote::txpool_event> txes) const;
+ };
+ };
+}}
diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp
index 1a9f49c01..6105b7f3a 100644
--- a/src/rpc/zmq_server.cpp
+++ b/src/rpc/zmq_server.cpp
@@ -29,10 +29,16 @@
#include "zmq_server.h"
#include <chrono>
-#include <cstdint>
+#include <cstring>
+#include <utility>
+#include <stdexcept>
#include <system_error>
#include "byte_slice.h"
+#include "rpc/zmq_pub.h"
+
+#undef MONERO_DEFAULT_LOG_CATEGORY
+#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq"
namespace cryptonote
{
@@ -42,14 +48,57 @@ namespace
constexpr const int num_zmq_threads = 1;
constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
-}
+
+ net::zmq::socket init_socket(void* context, int type, epee::span<const std::string> addresses)
+ {
+ if (context == nullptr)
+ throw std::logic_error{"NULL context provided"};
+
+ net::zmq::socket out{};
+ out.reset(zmq_socket(context, type));
+ if (!out)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to create ZMQ socket");
+ return nullptr;
+ }
+
+ if (zmq_setsockopt(out.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
+ return nullptr;
+ }
+
+ static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
+ if (zmq_setsockopt(out.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
+ return nullptr;
+ }
+
+ for (const std::string& address : addresses)
+ {
+ if (zmq_bind(out.get(), address.c_str()) < 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("ZMQ bind failed");
+ return nullptr;
+ }
+ MINFO("ZMQ now listening at " << address);
+ }
+
+ return out;
+ }
+} // anonymous
namespace rpc
{
ZmqServer::ZmqServer(RpcHandler& h) :
handler(h),
- context(zmq_init(num_zmq_threads))
+ context(zmq_init(num_zmq_threads)),
+ rep_socket(nullptr),
+ pub_socket(nullptr),
+ relay_socket(nullptr),
+ shared_state(nullptr)
{
if (!context)
MONERO_ZMQ_THROW("Unable to create ZMQ context");
@@ -64,22 +113,59 @@ void ZmqServer::serve()
try
{
// socket must close before `zmq_term` will exit.
- const net::zmq::socket socket = std::move(rep_socket);
- if (!socket)
+ const net::zmq::socket rep = std::move(rep_socket);
+ const net::zmq::socket pub = std::move(pub_socket);
+ const net::zmq::socket relay = std::move(relay_socket);
+ const std::shared_ptr<listener::zmq_pub> state = std::move(shared_state);
+
+ const unsigned init_count = unsigned(bool(pub)) + bool(relay) + bool(state);
+ if (!rep || (init_count && init_count != 3))
{
- MERROR("ZMQ RPC server reply socket is null");
+ MERROR("ZMQ RPC server socket is null");
return;
}
+ MINFO("ZMQ Server started");
+
+ const int read_flags = pub ? ZMQ_DONTWAIT : 0;
+ std::array<zmq_pollitem_t, 3> sockets =
+ {{
+ {relay.get(), 0, ZMQ_POLLIN, 0},
+ {pub.get(), 0, ZMQ_POLLIN, 0},
+ {rep.get(), 0, ZMQ_POLLIN, 0}
+ }};
+
+ /* This uses XPUB to watch for subscribers, to reduce CPU cycles for
+ serialization when the data will be dropped. This is important for block
+ serialization, which is done on the p2p threads currently (see
+ zmq_pub.cpp).
+
+ XPUB sockets are not thread-safe, so the p2p thread cannot write into
+ the socket while we read here for subscribers. A ZMQ_PAIR socket is
+ used for inproc notification. No data is every copied to kernel, it is
+ all userspace messaging. */
+
while (1)
{
- const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
- MDEBUG("Received RPC request: \"" << message << "\"");
- epee::byte_slice response = handler.handle(message);
+ if (pub)
+ MONERO_UNWRAP(net::zmq::retry_op(zmq_poll, sockets.data(), sockets.size(), -1));
- const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
- MDEBUG("Sending RPC reply: \"" << response_view << "\"");
- MONERO_UNWRAP(net::zmq::send(std::move(response), socket.get()));
+ if (sockets[0].revents)
+ state->relay_to_pub(relay.get(), pub.get());
+
+ if (sockets[1].revents)
+ state->sub_request(MONERO_UNWRAP(net::zmq::receive(pub.get(), ZMQ_DONTWAIT)));
+
+ if (!pub || sockets[2].revents)
+ {
+ const std::string message = MONERO_UNWRAP(net::zmq::receive(rep.get(), read_flags));
+ MDEBUG("Received RPC request: \"" << message << "\"");
+ epee::byte_slice response = handler.handle(message);
+
+ const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
+ MDEBUG("Sending RPC reply: \"" << response_view << "\"");
+ MONERO_UNWRAP(net::zmq::send(std::move(response), rep.get()));
+ }
}
}
catch (const std::system_error& e)
@@ -97,38 +183,12 @@ void ZmqServer::serve()
}
}
-bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
-{
- MERROR("ZmqServer::addIPCSocket not yet implemented!");
- return false;
-}
-
-bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
+void* ZmqServer::init_rpc(boost::string_ref address, boost::string_ref port)
{
if (!context)
{
MERROR("ZMQ RPC Server already shutdown");
- return false;
- }
-
- rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
- if (!rep_socket)
- {
- MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
- return false;
- }
-
- if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
- {
- MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
- return false;
- }
-
- static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
- if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
- {
- MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
- return false;
+ return nullptr;
}
if (address.empty())
@@ -141,12 +201,34 @@ bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
bind_address += ":";
bind_address.append(port.data(), port.size());
- if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
+ rep_socket = init_socket(context.get(), ZMQ_REP, {std::addressof(bind_address), 1});
+ return bool(rep_socket) ? context.get() : nullptr;
+}
+
+std::shared_ptr<listener::zmq_pub> ZmqServer::init_pub(epee::span<const std::string> addresses)
+{
+ try
+ {
+ shared_state = std::make_shared<listener::zmq_pub>(context.get());
+ pub_socket = init_socket(context.get(), ZMQ_XPUB, addresses);
+ if (!pub_socket)
+ throw std::runtime_error{"Unable to initialize ZMQ_XPUB socket"};
+
+ const std::string relay_address[] = {listener::zmq_pub::relay_endpoint()};
+ relay_socket = init_socket(context.get(), ZMQ_PAIR, relay_address);
+ if (!relay_socket)
+ throw std::runtime_error{"Unable to initialize ZMQ_PAIR relay"};
+ }
+ catch (const std::runtime_error& e)
{
- MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
- return false;
+ shared_state = nullptr;
+ pub_socket = nullptr;
+ relay_socket = nullptr;
+ MERROR("Failed to create ZMQ/Pub listener: " << e.what());
+ return nullptr;
}
- return true;
+
+ return shared_state;
}
void ZmqServer::run()
@@ -163,7 +245,6 @@ void ZmqServer::stop()
run_thread.join();
}
-
} // namespace cryptonote
} // namespace rpc
diff --git a/src/rpc/zmq_server.h b/src/rpc/zmq_server.h
index 1143db839..ddf44b411 100644
--- a/src/rpc/zmq_server.h
+++ b/src/rpc/zmq_server.h
@@ -30,10 +30,16 @@
#include <boost/thread/thread.hpp>
#include <boost/utility/string_ref.hpp>
+#include <cstdint>
+#include <memory>
+#include <string>
#include "common/command_line.h"
+#include "cryptonote_basic/fwd.h"
#include "net/zmq.h"
-#include "rpc_handler.h"
+#include "rpc/fwd.h"
+#include "rpc/rpc_handler.h"
+#include "span.h"
namespace cryptonote
{
@@ -41,7 +47,7 @@ namespace cryptonote
namespace rpc
{
-class ZmqServer
+class ZmqServer final
{
public:
@@ -49,12 +55,13 @@ class ZmqServer
~ZmqServer();
- static void init_options(boost::program_options::options_description& desc);
-
void serve();
- bool addIPCSocket(boost::string_ref address, boost::string_ref port);
- bool addTCPSocket(boost::string_ref address, boost::string_ref port);
+ //! \return ZMQ context on success, `nullptr` on failure
+ void* init_rpc(boost::string_ref address, boost::string_ref port);
+
+ //! \return `nullptr` on errors.
+ std::shared_ptr<listener::zmq_pub> init_pub(epee::span<const std::string> addresses);
void run();
void stop();
@@ -67,9 +74,11 @@ class ZmqServer
boost::thread run_thread;
net::zmq::socket rep_socket;
+ net::zmq::socket pub_socket;
+ net::zmq::socket relay_socket;
+ std::shared_ptr<listener::zmq_pub> shared_state;
};
-
} // namespace cryptonote
} // namespace rpc