aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorluigi1111 <luigi1111w@gmail.com>2019-09-14 13:25:14 -0500
committerluigi1111 <luigi1111w@gmail.com>2019-09-14 13:25:14 -0500
commitd663e1e3dbda1f12e2b0ea9ed74cfba2a953216e (patch)
treee5102407744d67a3cd4173840ffb728b880a55ee /src
parentMerge pull request #5899 (diff)
parentDropping cppzmq dependency; adding some zmq utils (diff)
downloadmonero-d663e1e3dbda1f12e2b0ea9ed74cfba2a953216e.tar.xz
Merge pull request #5818
f91a06c Dropping cppzmq dependency; adding some zmq utils (vtnerd)
Diffstat (limited to '')
-rw-r--r--src/net/CMakeLists.txt6
-rw-r--r--src/net/zmq.cpp188
-rw-r--r--src/net/zmq.h136
-rw-r--r--src/rpc/CMakeLists.txt2
-rw-r--r--src/rpc/zmq_server.cpp136
-rw-r--r--src/rpc/zmq_server.h20
6 files changed, 415 insertions, 73 deletions
diff --git a/src/net/CMakeLists.txt b/src/net/CMakeLists.txt
index 24b707f77..339587ffa 100644
--- a/src/net/CMakeLists.txt
+++ b/src/net/CMakeLists.txt
@@ -26,8 +26,10 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp socks_connect.cpp tor_address.cpp)
-set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h tor_address.h)
+set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp
+ socks_connect.cpp tor_address.cpp zmq.cpp)
+set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h
+ tor_address.h zmq.h)
monero_add_library(net ${net_sources} ${net_headers})
target_link_libraries(net common epee ${Boost_ASIO_LIBRARY})
diff --git a/src/net/zmq.cpp b/src/net/zmq.cpp
new file mode 100644
index 000000000..d02a22983
--- /dev/null
+++ b/src/net/zmq.cpp
@@ -0,0 +1,188 @@
+// Copyright (c) 2019, The Monero Project
+//
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification, are
+// permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of
+// conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list
+// of conditions and the following disclaimer in the documentation and/or other
+// materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be
+// used to endorse or promote products derived from this software without specific
+// prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#include "net/zmq.h"
+
+#include <cassert>
+#include <cerrno>
+#include <limits>
+#include <utility>
+
+namespace net
+{
+namespace zmq
+{
+ const std::error_category& error_category() noexcept
+ {
+ struct category final : std::error_category
+ {
+ virtual const char* name() const noexcept override final
+ {
+ return "error::error_category()";
+ }
+
+ virtual std::string message(int value) const override final
+ {
+ char const* const msg = zmq_strerror(value);
+ if (msg)
+ return msg;
+ return "zmq_strerror failure";
+ }
+
+ virtual std::error_condition default_error_condition(int value) const noexcept override final
+ {
+ // maps specific errors to generic `std::errc` cases.
+ switch (value)
+ {
+ case EFSM:
+ case ETERM:
+ break;
+ default:
+ /* zmq is using cerrno errors. C++ spec indicates that
+ `std::errc` values must be identical to the cerrno value.
+ So just map every zmq specific error to the generic errc
+ equivalent. zmq extensions must be in the switch or they
+ map to a non-existent errc enum value. */
+ return std::errc(value);
+ }
+ return std::error_condition{value, *this};
+ }
+
+ };
+ static const category instance{};
+ return instance;
+ }
+
+ void terminate::call(void* ptr) noexcept
+ {
+ assert(ptr != nullptr); // see header
+ while (zmq_term(ptr))
+ {
+ if (zmq_errno() != EINTR)
+ break;
+ }
+ }
+
+ namespace
+ {
+ //! RAII wrapper for `zmq_msg_t`.
+ class message
+ {
+ zmq_msg_t handle_;
+
+ public:
+ message() noexcept
+ : handle_()
+ {
+ zmq_msg_init(handle());
+ }
+
+ message(message&& rhs) = delete;
+ message(const message& rhs) = delete;
+ message& operator=(message&& rhs) = delete;
+ message& operator=(const message& rhs) = delete;
+
+ ~message() noexcept
+ {
+ zmq_msg_close(handle());
+ }
+
+ zmq_msg_t* handle() noexcept
+ {
+ return std::addressof(handle_);
+ }
+
+ const char* data() noexcept
+ {
+ return static_cast<const char*>(zmq_msg_data(handle()));
+ }
+
+ std::size_t size() noexcept
+ {
+ return zmq_msg_size(handle());
+ }
+ };
+
+ struct do_receive
+ {
+ /* ZMQ documentation states that message parts are atomic - either
+ all are received or none are. Looking through ZMQ code and
+ Github discussions indicates that after part 1 is returned,
+ `EAGAIN` cannot be returned to meet these guarantees. Unit tests
+ verify (for the `inproc://` case) that this is the behavior.
+ Therefore, read errors after the first part are treated as a
+ failure for the entire message (probably `ETERM`). */
+ int operator()(std::string& payload, void* const socket, const int flags) const
+ {
+ static constexpr const int max_out = std::numeric_limits<int>::max();
+ const std::string::size_type initial = payload.size();
+ message part{};
+ for (;;)
+ {
+ int last = 0;
+ if ((last = zmq_msg_recv(part.handle(), socket, flags)) < 0)
+ return last;
+
+ payload.append(part.data(), part.size());
+ if (!zmq_msg_more(part.handle()))
+ break;
+ }
+ const std::string::size_type added = payload.size() - initial;
+ return unsigned(max_out) < added ? max_out : int(added);
+ }
+ };
+
+ template<typename F, typename... T>
+ expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...)))
+ {
+ for (;;)
+ {
+ if (0 <= op(args...))
+ return success();
+
+ const int error = zmq_errno();
+ if (error != EINTR)
+ return make_error_code(error);
+ }
+ }
+ } // anonymous
+
+ expect<std::string> receive(void* const socket, const int flags)
+ {
+ std::string payload{};
+ MONERO_CHECK(retry_op(do_receive{}, payload, socket, flags));
+ return {std::move(payload)};
+ }
+
+ expect<void> send(const epee::span<const std::uint8_t> payload, void* const socket, const int flags) noexcept
+ {
+ return retry_op(zmq_send, socket, payload.data(), payload.size(), flags);
+ }
+} // zmq
+} // net
+
diff --git a/src/net/zmq.h b/src/net/zmq.h
new file mode 100644
index 000000000..c6a7fd743
--- /dev/null
+++ b/src/net/zmq.h
@@ -0,0 +1,136 @@
+// Copyright (c) 2019, The Monero Project
+//
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification, are
+// permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of
+// conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list
+// of conditions and the following disclaimer in the documentation and/or other
+// materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be
+// used to endorse or promote products derived from this software without specific
+// prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#include <memory>
+#include <string>
+#include <system_error>
+#include <zmq.h>
+
+#include "common/expect.h"
+#include "span.h"
+
+//! If the expression is less than 0, return the current ZMQ error code.
+#define MONERO_ZMQ_CHECK(...) \
+ do \
+ { \
+ if (( __VA_ARGS__ ) < 0) \
+ return {::net::zmq::get_error_code()}; \
+ } while (0)
+
+//! Print a message followed by the current ZMQ error message.
+#define MONERO_LOG_ZMQ_ERROR(...) \
+ do \
+ { \
+ MERROR( __VA_ARGS__ << ": " << ::net::zmq::get_error_code().message()); \
+ } while (0)
+
+//! Throw an exception with a custom `msg`, current ZMQ error code, filename, and line number.
+#define MONERO_ZMQ_THROW(msg) \
+ MONERO_THROW( ::net::zmq::get_error_code(), msg )
+
+namespace net
+{
+namespace zmq
+{
+ //! \return Category for ZMQ errors.
+ const std::error_category& error_category() noexcept;
+
+ //! \return `code` (usally from zmq_errno()`) using `net::zmq::error_category()`.
+ inline std::error_code make_error_code(int code) noexcept
+ {
+ return std::error_code{code, error_category()};
+ }
+
+ //! \return Error from `zmq_errno()` using `net::zmq::error_category()`.
+ inline std::error_code get_error_code() noexcept
+ {
+ return make_error_code(zmq_errno());
+ }
+
+ //! Calls `zmq_term`
+ class terminate
+ {
+ static void call(void* ptr) noexcept;
+ public:
+ void operator()(void* ptr) const noexcept
+ {
+ if (ptr)
+ call(ptr);
+ }
+ };
+
+ //! Calls `zmq_close`
+ struct close
+ {
+ void operator()(void* ptr) const noexcept
+ {
+ if (ptr)
+ zmq_close(ptr);
+ }
+ };
+
+ //! Unique ZMQ context handle, calls `zmq_term` on destruction.
+ using context = std::unique_ptr<void, terminate>;
+
+ //! Unique ZMQ socket handle, calls `zmq_close` on destruction.
+ using socket = std::unique_ptr<void, close>;
+
+ /*! Read all parts of the next message on `socket`. Blocks until the entire
+ next message (all parts) are read, or until `zmq_term` is called on the
+ `zmq_context` associated with `socket`. If the context is terminated,
+ `make_error_code(ETERM)` is returned.
+
+ \note This will automatically retry on `EINTR`, so exiting on
+ interrupts requires context termination.
+ \note If non-blocking behavior is requested on `socket` or by `flags`,
+ then `net::zmq::make_error_code(EAGAIN)` will be returned if this
+ would block.
+
+ \param socket Handle created with `zmq_socket`.
+ \param flags See `zmq_msg_read` for possible flags.
+ \return Message payload read from `socket` or ZMQ error. */
+ expect<std::string> receive(void* socket, int flags = 0);
+
+ /*! Sends `payload` on `socket`. Blocks until the entire message is queued
+ for sending, or until `zmq_term` is called on the `zmq_context`
+ associated with `socket`. If the context is terminated,
+ `make_error_code(ETERM)` is returned.
+
+ \note This will automatically retry on `EINTR`, so exiting on
+ interrupts requires context termination.
+ \note If non-blocking behavior is requested on `socket` or by `flags`,
+ then `net::zmq::make_error_code(EAGAIN)` will be returned if this
+ would block.
+
+ \param payload sent as one message on `socket`.
+ \param socket Handle created with `zmq_socket`.
+ \param flags See `zmq_send` for possible flags.
+ \return `success()` if sent, otherwise ZMQ error. */
+ expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept;
+} // zmq
+} // net
diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt
index d98670e05..116e7f568 100644
--- a/src/rpc/CMakeLists.txt
+++ b/src/rpc/CMakeLists.txt
@@ -26,6 +26,8 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+include_directories(SYSTEM ${ZMQ_INCLUDE_PATH})
+
set(rpc_base_sources
rpc_args.cpp)
diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp
index 668a2e5cd..1ee55673e 100644
--- a/src/rpc/zmq_server.cpp
+++ b/src/rpc/zmq_server.cpp
@@ -28,18 +28,29 @@
#include "zmq_server.h"
+#include <chrono>
+#include <cstdint>
+#include <system_error>
+
namespace cryptonote
{
+namespace
+{
+ constexpr const int num_zmq_threads = 1;
+ constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
+ constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
+}
+
namespace rpc
{
ZmqServer::ZmqServer(RpcHandler& h) :
handler(h),
- stop_signal(false),
- running(false),
- context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable
+ context(zmq_init(num_zmq_threads))
{
+ if (!context)
+ MONERO_ZMQ_THROW("Unable to create ZMQ context");
}
ZmqServer::~ZmqServer()
@@ -48,71 +59,88 @@ ZmqServer::~ZmqServer()
void ZmqServer::serve()
{
-
- while (1)
+ try
{
- try
+ // socket must close before `zmq_term` will exit.
+ const net::zmq::socket socket = std::move(rep_socket);
+ if (!socket)
{
- zmq::message_t message;
-
- if (!rep_socket)
- {
- throw std::runtime_error("ZMQ RPC server reply socket is null");
- }
- while (rep_socket->recv(&message, 0))
- {
- std::string message_string(reinterpret_cast<const char *>(message.data()), message.size());
-
- MDEBUG(std::string("Received RPC request: \"") + message_string + "\"");
-
- std::string response = handler.handle(message_string);
-
- zmq::message_t reply(response.size());
- memcpy((void *) reply.data(), response.c_str(), response.size());
-
- rep_socket->send(reply);
- MDEBUG(std::string("Sent RPC reply: \"") + response + "\"");
-
- }
- }
- catch (const boost::thread_interrupted& e)
- {
- MDEBUG("ZMQ Server thread interrupted.");
+ MERROR("ZMQ RPC server reply socket is null");
+ return;
}
- catch (const zmq::error_t& e)
+
+ while (1)
{
- MERROR(std::string("ZMQ error: ") + e.what());
+ const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
+ MDEBUG("Received RPC request: \"" << message << "\"");
+ const std::string& response = handler.handle(message);
+
+ MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
+ MDEBUG("Sent RPC reply: \"" << response << "\"");
}
- boost::this_thread::interruption_point();
+ }
+ catch (const std::system_error& e)
+ {
+ if (e.code() != net::zmq::make_error_code(ETERM))
+ MERROR("ZMQ RPC Server Error: " << e.what());
+ }
+ catch (const std::exception& e)
+ {
+ MERROR("ZMQ RPC Server Error: " << e.what());
+ }
+ catch (...)
+ {
+ MERROR("Unknown error in ZMQ RPC server");
}
}
-bool ZmqServer::addIPCSocket(std::string address, std::string port)
+bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
{
MERROR("ZmqServer::addIPCSocket not yet implemented!");
return false;
}
-bool ZmqServer::addTCPSocket(std::string address, std::string port)
+bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
{
- try
+ if (!context)
{
- std::string addr_prefix("tcp://");
+ MERROR("ZMQ RPC Server already shutdown");
+ return false;
+ }
- rep_socket.reset(new zmq::socket_t(context, ZMQ_REP));
+ rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
+ if (!rep_socket)
+ {
+ MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
+ return false;
+ }
- rep_socket->setsockopt(ZMQ_RCVTIMEO, &DEFAULT_RPC_RECV_TIMEOUT_MS, sizeof(DEFAULT_RPC_RECV_TIMEOUT_MS));
+ if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
+ return false;
+ }
- if (address.empty())
- address = "*";
- if (port.empty())
- port = "*";
- std::string bind_address = addr_prefix + address + std::string(":") + port;
- rep_socket->bind(bind_address.c_str());
+ static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
+ if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
+ return false;
}
- catch (const std::exception& e)
+
+ if (address.empty())
+ address = "*";
+ if (port.empty())
+ port = "*";
+
+ std::string bind_address = "tcp://";
+ bind_address.append(address.data(), address.size());
+ bind_address += ":";
+ bind_address.append(port.data(), port.size());
+
+ if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
{
- MERROR(std::string("Error creating ZMQ Socket: ") + e.what());
+ MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
return false;
}
return true;
@@ -120,22 +148,16 @@ bool ZmqServer::addTCPSocket(std::string address, std::string port)
void ZmqServer::run()
{
- running = true;
run_thread = boost::thread(boost::bind(&ZmqServer::serve, this));
}
void ZmqServer::stop()
{
- if (!running) return;
-
- stop_signal = true;
+ if (!run_thread.joinable())
+ return;
- run_thread.interrupt();
+ context.reset(); // destroying context terminates all calls
run_thread.join();
-
- running = false;
-
- return;
}
diff --git a/src/rpc/zmq_server.h b/src/rpc/zmq_server.h
index 1b1e4c7cf..ce7892dab 100644
--- a/src/rpc/zmq_server.h
+++ b/src/rpc/zmq_server.h
@@ -29,12 +29,10 @@
#pragma once
#include <boost/thread/thread.hpp>
-#include <zmq.hpp>
-#include <string>
-#include <memory>
+#include <boost/utility/string_ref.hpp>
#include "common/command_line.h"
-
+#include "net/zmq.h"
#include "rpc_handler.h"
namespace cryptonote
@@ -43,9 +41,6 @@ namespace cryptonote
namespace rpc
{
-static constexpr int DEFAULT_NUM_ZMQ_THREADS = 1;
-static constexpr int DEFAULT_RPC_RECV_TIMEOUT_MS = 1000;
-
class ZmqServer
{
public:
@@ -58,8 +53,8 @@ class ZmqServer
void serve();
- bool addIPCSocket(std::string address, std::string port);
- bool addTCPSocket(std::string address, std::string port);
+ bool addIPCSocket(boost::string_ref address, boost::string_ref port);
+ bool addTCPSocket(boost::string_ref address, boost::string_ref port);
void run();
void stop();
@@ -67,14 +62,11 @@ class ZmqServer
private:
RpcHandler& handler;
- volatile bool stop_signal;
- volatile bool running;
-
- zmq::context_t context;
+ net::zmq::context context;
boost::thread run_thread;
- std::unique_ptr<zmq::socket_t> rep_socket;
+ net::zmq::socket rep_socket;
};