diff options
Diffstat (limited to '')
-rw-r--r-- | src/rpc/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/rpc/zmq_server.cpp | 136 | ||||
-rw-r--r-- | src/rpc/zmq_server.h | 20 |
3 files changed, 87 insertions, 71 deletions
diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index d98670e05..116e7f568 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -26,6 +26,8 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +include_directories(SYSTEM ${ZMQ_INCLUDE_PATH}) + set(rpc_base_sources rpc_args.cpp) diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp index 668a2e5cd..1ee55673e 100644 --- a/src/rpc/zmq_server.cpp +++ b/src/rpc/zmq_server.cpp @@ -28,18 +28,29 @@ #include "zmq_server.h" +#include <chrono> +#include <cstdint> +#include <system_error> + namespace cryptonote { +namespace +{ + constexpr const int num_zmq_threads = 1; + constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB + constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages +} + namespace rpc { ZmqServer::ZmqServer(RpcHandler& h) : handler(h), - stop_signal(false), - running(false), - context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable + context(zmq_init(num_zmq_threads)) { + if (!context) + MONERO_ZMQ_THROW("Unable to create ZMQ context"); } ZmqServer::~ZmqServer() @@ -48,71 +59,88 @@ ZmqServer::~ZmqServer() void ZmqServer::serve() { - - while (1) + try { - try + // socket must close before `zmq_term` will exit. + const net::zmq::socket socket = std::move(rep_socket); + if (!socket) { - zmq::message_t message; - - if (!rep_socket) - { - throw std::runtime_error("ZMQ RPC server reply socket is null"); - } - while (rep_socket->recv(&message, 0)) - { - std::string message_string(reinterpret_cast<const char *>(message.data()), message.size()); - - MDEBUG(std::string("Received RPC request: \"") + message_string + "\""); - - std::string response = handler.handle(message_string); - - zmq::message_t reply(response.size()); - memcpy((void *) reply.data(), response.c_str(), response.size()); - - rep_socket->send(reply); - MDEBUG(std::string("Sent RPC reply: \"") + response + "\""); - - } - } - catch (const boost::thread_interrupted& e) - { - MDEBUG("ZMQ Server thread interrupted."); + MERROR("ZMQ RPC server reply socket is null"); + return; } - catch (const zmq::error_t& e) + + while (1) { - MERROR(std::string("ZMQ error: ") + e.what()); + const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get())); + MDEBUG("Received RPC request: \"" << message << "\""); + const std::string& response = handler.handle(message); + + MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get())); + MDEBUG("Sent RPC reply: \"" << response << "\""); } - boost::this_thread::interruption_point(); + } + catch (const std::system_error& e) + { + if (e.code() != net::zmq::make_error_code(ETERM)) + MERROR("ZMQ RPC Server Error: " << e.what()); + } + catch (const std::exception& e) + { + MERROR("ZMQ RPC Server Error: " << e.what()); + } + catch (...) + { + MERROR("Unknown error in ZMQ RPC server"); } } -bool ZmqServer::addIPCSocket(std::string address, std::string port) +bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port) { MERROR("ZmqServer::addIPCSocket not yet implemented!"); return false; } -bool ZmqServer::addTCPSocket(std::string address, std::string port) +bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port) { - try + if (!context) { - std::string addr_prefix("tcp://"); + MERROR("ZMQ RPC Server already shutdown"); + return false; + } - rep_socket.reset(new zmq::socket_t(context, ZMQ_REP)); + rep_socket.reset(zmq_socket(context.get(), ZMQ_REP)); + if (!rep_socket) + { + MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed"); + return false; + } - rep_socket->setsockopt(ZMQ_RCVTIMEO, &DEFAULT_RPC_RECV_TIMEOUT_MS, sizeof(DEFAULT_RPC_RECV_TIMEOUT_MS)); + if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0) + { + MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size"); + return false; + } - if (address.empty()) - address = "*"; - if (port.empty()) - port = "*"; - std::string bind_address = addr_prefix + address + std::string(":") + port; - rep_socket->bind(bind_address.c_str()); + static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count(); + if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0) + { + MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout"); + return false; } - catch (const std::exception& e) + + if (address.empty()) + address = "*"; + if (port.empty()) + port = "*"; + + std::string bind_address = "tcp://"; + bind_address.append(address.data(), address.size()); + bind_address += ":"; + bind_address.append(port.data(), port.size()); + + if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0) { - MERROR(std::string("Error creating ZMQ Socket: ") + e.what()); + MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed"); return false; } return true; @@ -120,22 +148,16 @@ bool ZmqServer::addTCPSocket(std::string address, std::string port) void ZmqServer::run() { - running = true; run_thread = boost::thread(boost::bind(&ZmqServer::serve, this)); } void ZmqServer::stop() { - if (!running) return; - - stop_signal = true; + if (!run_thread.joinable()) + return; - run_thread.interrupt(); + context.reset(); // destroying context terminates all calls run_thread.join(); - - running = false; - - return; } diff --git a/src/rpc/zmq_server.h b/src/rpc/zmq_server.h index 1b1e4c7cf..ce7892dab 100644 --- a/src/rpc/zmq_server.h +++ b/src/rpc/zmq_server.h @@ -29,12 +29,10 @@ #pragma once #include <boost/thread/thread.hpp> -#include <zmq.hpp> -#include <string> -#include <memory> +#include <boost/utility/string_ref.hpp> #include "common/command_line.h" - +#include "net/zmq.h" #include "rpc_handler.h" namespace cryptonote @@ -43,9 +41,6 @@ namespace cryptonote namespace rpc { -static constexpr int DEFAULT_NUM_ZMQ_THREADS = 1; -static constexpr int DEFAULT_RPC_RECV_TIMEOUT_MS = 1000; - class ZmqServer { public: @@ -58,8 +53,8 @@ class ZmqServer void serve(); - bool addIPCSocket(std::string address, std::string port); - bool addTCPSocket(std::string address, std::string port); + bool addIPCSocket(boost::string_ref address, boost::string_ref port); + bool addTCPSocket(boost::string_ref address, boost::string_ref port); void run(); void stop(); @@ -67,14 +62,11 @@ class ZmqServer private: RpcHandler& handler; - volatile bool stop_signal; - volatile bool running; - - zmq::context_t context; + net::zmq::context context; boost::thread run_thread; - std::unique_ptr<zmq::socket_t> rep_socket; + net::zmq::socket rep_socket; }; |