From 77986023c37737e298fc7c3e9938cce0e10d8b80 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Tue, 5 Sep 2017 12:20:27 -0400 Subject: json serialization for rpc-relevant monero types Structured {de-,}serialization methods for (many new) types which are used for requests or responses in the RPC. New types include RPC requests and responses, and structs which compose types within those. # Conflicts: # src/cryptonote_core/blockchain.cpp --- src/rpc/zmq_server.cpp | 146 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 src/rpc/zmq_server.cpp (limited to 'src/rpc/zmq_server.cpp') diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp new file mode 100644 index 000000000..8e1e47b38 --- /dev/null +++ b/src/rpc/zmq_server.cpp @@ -0,0 +1,146 @@ +// Copyright (c) 2016, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "zmq_server.h" +#include + +namespace cryptonote +{ + +namespace rpc +{ + +ZmqServer::ZmqServer(RpcHandler& h) : + handler(h), + stop_signal(false), + running(false), + context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable +{ +} + +ZmqServer::~ZmqServer() +{ + for (zmq::socket_t* socket : sockets) + { + delete socket; + } +} + +void ZmqServer::serve() +{ + + while (1) + { + try + { + for (zmq::socket_t* socket : sockets) + { + zmq::message_t message; + + while (socket->recv(&message)) + { + std::string message_string(reinterpret_cast(message.data()), message.size()); + + MDEBUG(std::string("Received RPC request: \"") + message_string + "\""); + + std::string response = handler.handle(message_string); + + zmq::message_t reply(response.size()); + memcpy((void *) reply.data(), response.c_str(), response.size()); + + socket->send(reply); + MDEBUG(std::string("Sent RPC reply: \"") + response + "\""); + } + + } + } + catch (boost::thread_interrupted& e) + { + MDEBUG("ZMQ Server thread interrupted."); + } + boost::this_thread::interruption_point(); + } +} + +bool ZmqServer::addIPCSocket(std::string address, std::string port) +{ + MERROR("ZmqServer::addIPCSocket not yet implemented!"); + return false; +} + +bool ZmqServer::addTCPSocket(std::string address, std::string port) +{ + zmq::socket_t *new_socket = nullptr; + try + { + std::string addr_prefix("tcp://"); + + new_socket = new zmq::socket_t(context, ZMQ_REP); + + new_socket->setsockopt(ZMQ_RCVTIMEO, DEFAULT_RPC_RECV_TIMEOUT_MS); + + std::string bind_address = addr_prefix + address + std::string(":") + port; + new_socket->bind(bind_address.c_str()); + sockets.push_back(new_socket); + } + catch (std::exception& e) + { + MERROR(std::string("Error creating ZMQ Socket: ") + e.what()); + if (new_socket) + { + delete new_socket; + } + return false; + } + return true; +} + +void ZmqServer::run() +{ + running = true; + run_thread = boost::thread(boost::bind(&ZmqServer::serve, this)); +} + +void ZmqServer::stop() +{ + if (!running) return; + + stop_signal = true; + + run_thread.interrupt(); + run_thread.join(); + + running = false; + + return; +} + + +} // namespace cryptonote + +} // namespace rpc -- cgit v1.2.3 From 0299cb77ca18073daf3cf371f8da013fb596ae48 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Tue, 5 Sep 2017 12:20:40 -0400 Subject: Fix various oversights/bugs in ZMQ RPC server code - Add some RPC commands (and touch up a couple others) - some bounds checking - some better pointer management - const correctness and error handling -- Thanks @vtnerd for type help with serialization and CMake changes --- src/rpc/zmq_server.cpp | 51 +++++++++++++++++++++++--------------------------- 1 file changed, 23 insertions(+), 28 deletions(-) (limited to 'src/rpc/zmq_server.cpp') diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp index 8e1e47b38..afdff2328 100644 --- a/src/rpc/zmq_server.cpp +++ b/src/rpc/zmq_server.cpp @@ -45,10 +45,6 @@ ZmqServer::ZmqServer(RpcHandler& h) : ZmqServer::~ZmqServer() { - for (zmq::socket_t* socket : sockets) - { - delete socket; - } } void ZmqServer::serve() @@ -58,31 +54,36 @@ void ZmqServer::serve() { try { - for (zmq::socket_t* socket : sockets) - { - zmq::message_t message; + zmq::message_t message; - while (socket->recv(&message)) - { - std::string message_string(reinterpret_cast(message.data()), message.size()); + if (!rep_socket) + { + throw std::runtime_error("ZMQ RPC server reply socket is null"); + } + while (rep_socket->recv(&message)) + { + std::string message_string(reinterpret_cast(message.data()), message.size()); - MDEBUG(std::string("Received RPC request: \"") + message_string + "\""); + MDEBUG(std::string("Received RPC request: \"") + message_string + "\""); - std::string response = handler.handle(message_string); + std::string response = handler.handle(message_string); - zmq::message_t reply(response.size()); - memcpy((void *) reply.data(), response.c_str(), response.size()); + zmq::message_t reply(response.size()); + memcpy((void *) reply.data(), response.c_str(), response.size()); - socket->send(reply); - MDEBUG(std::string("Sent RPC reply: \"") + response + "\""); - } + rep_socket->send(reply); + MDEBUG(std::string("Sent RPC reply: \"") + response + "\""); } } - catch (boost::thread_interrupted& e) + catch (const boost::thread_interrupted& e) { MDEBUG("ZMQ Server thread interrupted."); } + catch (const zmq::error_t& e) + { + MERROR(std::string("ZMQ error: ") + e.what()); + } boost::this_thread::interruption_point(); } } @@ -95,26 +96,20 @@ bool ZmqServer::addIPCSocket(std::string address, std::string port) bool ZmqServer::addTCPSocket(std::string address, std::string port) { - zmq::socket_t *new_socket = nullptr; try { std::string addr_prefix("tcp://"); - new_socket = new zmq::socket_t(context, ZMQ_REP); + rep_socket.reset(new zmq::socket_t(context, ZMQ_REP)); - new_socket->setsockopt(ZMQ_RCVTIMEO, DEFAULT_RPC_RECV_TIMEOUT_MS); + rep_socket->setsockopt(ZMQ_RCVTIMEO, DEFAULT_RPC_RECV_TIMEOUT_MS); std::string bind_address = addr_prefix + address + std::string(":") + port; - new_socket->bind(bind_address.c_str()); - sockets.push_back(new_socket); + rep_socket->bind(bind_address.c_str()); } - catch (std::exception& e) + catch (const std::exception& e) { MERROR(std::string("Error creating ZMQ Socket: ") + e.what()); - if (new_socket) - { - delete new_socket; - } return false; } return true; -- cgit v1.2.3