aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/zmq_server.cpp
diff options
context:
space:
mode:
authorLee Clagett <code@leeclagett.com>2020-03-16 23:59:26 +0000
committerLee Clagett <code@leeclagett.com>2020-05-04 02:06:35 +0000
commite5214a2ca22cecf123bcff1ab441ed0415d08a6f (patch)
tree75a052bb95f6087421c8fedde549d6930c5af847 /src/rpc/zmq_server.cpp
parentMerge pull request #6586 (diff)
downloadmonero-e5214a2ca22cecf123bcff1ab441ed0415d08a6f.tar.xz
Adding ZMQ/Pub support for txpool_add and chain_main events
Diffstat (limited to 'src/rpc/zmq_server.cpp')
-rw-r--r--src/rpc/zmq_server.cpp171
1 files changed, 126 insertions, 45 deletions
diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp
index 1a9f49c01..6105b7f3a 100644
--- a/src/rpc/zmq_server.cpp
+++ b/src/rpc/zmq_server.cpp
@@ -29,10 +29,16 @@
#include "zmq_server.h"
#include <chrono>
-#include <cstdint>
+#include <cstring>
+#include <utility>
+#include <stdexcept>
#include <system_error>
#include "byte_slice.h"
+#include "rpc/zmq_pub.h"
+
+#undef MONERO_DEFAULT_LOG_CATEGORY
+#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq"
namespace cryptonote
{
@@ -42,14 +48,57 @@ namespace
constexpr const int num_zmq_threads = 1;
constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
-}
+
+ net::zmq::socket init_socket(void* context, int type, epee::span<const std::string> addresses)
+ {
+ if (context == nullptr)
+ throw std::logic_error{"NULL context provided"};
+
+ net::zmq::socket out{};
+ out.reset(zmq_socket(context, type));
+ if (!out)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to create ZMQ socket");
+ return nullptr;
+ }
+
+ if (zmq_setsockopt(out.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
+ return nullptr;
+ }
+
+ static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
+ if (zmq_setsockopt(out.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
+ return nullptr;
+ }
+
+ for (const std::string& address : addresses)
+ {
+ if (zmq_bind(out.get(), address.c_str()) < 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("ZMQ bind failed");
+ return nullptr;
+ }
+ MINFO("ZMQ now listening at " << address);
+ }
+
+ return out;
+ }
+} // anonymous
namespace rpc
{
ZmqServer::ZmqServer(RpcHandler& h) :
handler(h),
- context(zmq_init(num_zmq_threads))
+ context(zmq_init(num_zmq_threads)),
+ rep_socket(nullptr),
+ pub_socket(nullptr),
+ relay_socket(nullptr),
+ shared_state(nullptr)
{
if (!context)
MONERO_ZMQ_THROW("Unable to create ZMQ context");
@@ -64,22 +113,59 @@ void ZmqServer::serve()
try
{
// socket must close before `zmq_term` will exit.
- const net::zmq::socket socket = std::move(rep_socket);
- if (!socket)
+ const net::zmq::socket rep = std::move(rep_socket);
+ const net::zmq::socket pub = std::move(pub_socket);
+ const net::zmq::socket relay = std::move(relay_socket);
+ const std::shared_ptr<listener::zmq_pub> state = std::move(shared_state);
+
+ const unsigned init_count = unsigned(bool(pub)) + bool(relay) + bool(state);
+ if (!rep || (init_count && init_count != 3))
{
- MERROR("ZMQ RPC server reply socket is null");
+ MERROR("ZMQ RPC server socket is null");
return;
}
+ MINFO("ZMQ Server started");
+
+ const int read_flags = pub ? ZMQ_DONTWAIT : 0;
+ std::array<zmq_pollitem_t, 3> sockets =
+ {{
+ {relay.get(), 0, ZMQ_POLLIN, 0},
+ {pub.get(), 0, ZMQ_POLLIN, 0},
+ {rep.get(), 0, ZMQ_POLLIN, 0}
+ }};
+
+ /* This uses XPUB to watch for subscribers, to reduce CPU cycles for
+ serialization when the data will be dropped. This is important for block
+ serialization, which is done on the p2p threads currently (see
+ zmq_pub.cpp).
+
+ XPUB sockets are not thread-safe, so the p2p thread cannot write into
+ the socket while we read here for subscribers. A ZMQ_PAIR socket is
+ used for inproc notification. No data is every copied to kernel, it is
+ all userspace messaging. */
+
while (1)
{
- const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
- MDEBUG("Received RPC request: \"" << message << "\"");
- epee::byte_slice response = handler.handle(message);
+ if (pub)
+ MONERO_UNWRAP(net::zmq::retry_op(zmq_poll, sockets.data(), sockets.size(), -1));
- const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
- MDEBUG("Sending RPC reply: \"" << response_view << "\"");
- MONERO_UNWRAP(net::zmq::send(std::move(response), socket.get()));
+ if (sockets[0].revents)
+ state->relay_to_pub(relay.get(), pub.get());
+
+ if (sockets[1].revents)
+ state->sub_request(MONERO_UNWRAP(net::zmq::receive(pub.get(), ZMQ_DONTWAIT)));
+
+ if (!pub || sockets[2].revents)
+ {
+ const std::string message = MONERO_UNWRAP(net::zmq::receive(rep.get(), read_flags));
+ MDEBUG("Received RPC request: \"" << message << "\"");
+ epee::byte_slice response = handler.handle(message);
+
+ const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
+ MDEBUG("Sending RPC reply: \"" << response_view << "\"");
+ MONERO_UNWRAP(net::zmq::send(std::move(response), rep.get()));
+ }
}
}
catch (const std::system_error& e)
@@ -97,38 +183,12 @@ void ZmqServer::serve()
}
}
-bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
-{
- MERROR("ZmqServer::addIPCSocket not yet implemented!");
- return false;
-}
-
-bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
+void* ZmqServer::init_rpc(boost::string_ref address, boost::string_ref port)
{
if (!context)
{
MERROR("ZMQ RPC Server already shutdown");
- return false;
- }
-
- rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
- if (!rep_socket)
- {
- MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
- return false;
- }
-
- if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
- {
- MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
- return false;
- }
-
- static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
- if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
- {
- MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
- return false;
+ return nullptr;
}
if (address.empty())
@@ -141,12 +201,34 @@ bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
bind_address += ":";
bind_address.append(port.data(), port.size());
- if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
+ rep_socket = init_socket(context.get(), ZMQ_REP, {std::addressof(bind_address), 1});
+ return bool(rep_socket) ? context.get() : nullptr;
+}
+
+std::shared_ptr<listener::zmq_pub> ZmqServer::init_pub(epee::span<const std::string> addresses)
+{
+ try
+ {
+ shared_state = std::make_shared<listener::zmq_pub>(context.get());
+ pub_socket = init_socket(context.get(), ZMQ_XPUB, addresses);
+ if (!pub_socket)
+ throw std::runtime_error{"Unable to initialize ZMQ_XPUB socket"};
+
+ const std::string relay_address[] = {listener::zmq_pub::relay_endpoint()};
+ relay_socket = init_socket(context.get(), ZMQ_PAIR, relay_address);
+ if (!relay_socket)
+ throw std::runtime_error{"Unable to initialize ZMQ_PAIR relay"};
+ }
+ catch (const std::runtime_error& e)
{
- MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
- return false;
+ shared_state = nullptr;
+ pub_socket = nullptr;
+ relay_socket = nullptr;
+ MERROR("Failed to create ZMQ/Pub listener: " << e.what());
+ return nullptr;
}
- return true;
+
+ return shared_state;
}
void ZmqServer::run()
@@ -163,7 +245,6 @@ void ZmqServer::stop()
run_thread.join();
}
-
} // namespace cryptonote
} // namespace rpc