aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/zmq_server.cpp
diff options
context:
space:
mode:
authorLee Clagett <code@leeclagett.com>2019-08-18 13:49:02 -0400
committerLee Clagett <code@leeclagett.com>2019-07-22 06:37:16 +0000
commitf91a06c6d7ec3d649b6bdb7a38bb1e0b8f7293f9 (patch)
tree9d3eeae2f54baf34a54376c8a51f09501b03f991 /src/rpc/zmq_server.cpp
parentMerge pull request #5824 (diff)
downloadmonero-f91a06c6d7ec3d649b6bdb7a38bb1e0b8f7293f9.tar.xz
Dropping cppzmq dependency; adding some zmq utils
Diffstat (limited to '')
-rw-r--r--src/rpc/zmq_server.cpp136
1 files changed, 79 insertions, 57 deletions
diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp
index 668a2e5cd..1ee55673e 100644
--- a/src/rpc/zmq_server.cpp
+++ b/src/rpc/zmq_server.cpp
@@ -28,18 +28,29 @@
#include "zmq_server.h"
+#include <chrono>
+#include <cstdint>
+#include <system_error>
+
namespace cryptonote
{
+namespace
+{
+ constexpr const int num_zmq_threads = 1;
+ constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
+ constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
+}
+
namespace rpc
{
ZmqServer::ZmqServer(RpcHandler& h) :
handler(h),
- stop_signal(false),
- running(false),
- context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable
+ context(zmq_init(num_zmq_threads))
{
+ if (!context)
+ MONERO_ZMQ_THROW("Unable to create ZMQ context");
}
ZmqServer::~ZmqServer()
@@ -48,71 +59,88 @@ ZmqServer::~ZmqServer()
void ZmqServer::serve()
{
-
- while (1)
+ try
{
- try
+ // socket must close before `zmq_term` will exit.
+ const net::zmq::socket socket = std::move(rep_socket);
+ if (!socket)
{
- zmq::message_t message;
-
- if (!rep_socket)
- {
- throw std::runtime_error("ZMQ RPC server reply socket is null");
- }
- while (rep_socket->recv(&message, 0))
- {
- std::string message_string(reinterpret_cast<const char *>(message.data()), message.size());
-
- MDEBUG(std::string("Received RPC request: \"") + message_string + "\"");
-
- std::string response = handler.handle(message_string);
-
- zmq::message_t reply(response.size());
- memcpy((void *) reply.data(), response.c_str(), response.size());
-
- rep_socket->send(reply);
- MDEBUG(std::string("Sent RPC reply: \"") + response + "\"");
-
- }
- }
- catch (const boost::thread_interrupted& e)
- {
- MDEBUG("ZMQ Server thread interrupted.");
+ MERROR("ZMQ RPC server reply socket is null");
+ return;
}
- catch (const zmq::error_t& e)
+
+ while (1)
{
- MERROR(std::string("ZMQ error: ") + e.what());
+ const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
+ MDEBUG("Received RPC request: \"" << message << "\"");
+ const std::string& response = handler.handle(message);
+
+ MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
+ MDEBUG("Sent RPC reply: \"" << response << "\"");
}
- boost::this_thread::interruption_point();
+ }
+ catch (const std::system_error& e)
+ {
+ if (e.code() != net::zmq::make_error_code(ETERM))
+ MERROR("ZMQ RPC Server Error: " << e.what());
+ }
+ catch (const std::exception& e)
+ {
+ MERROR("ZMQ RPC Server Error: " << e.what());
+ }
+ catch (...)
+ {
+ MERROR("Unknown error in ZMQ RPC server");
}
}
-bool ZmqServer::addIPCSocket(std::string address, std::string port)
+bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
{
MERROR("ZmqServer::addIPCSocket not yet implemented!");
return false;
}
-bool ZmqServer::addTCPSocket(std::string address, std::string port)
+bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
{
- try
+ if (!context)
{
- std::string addr_prefix("tcp://");
+ MERROR("ZMQ RPC Server already shutdown");
+ return false;
+ }
- rep_socket.reset(new zmq::socket_t(context, ZMQ_REP));
+ rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
+ if (!rep_socket)
+ {
+ MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
+ return false;
+ }
- rep_socket->setsockopt(ZMQ_RCVTIMEO, &DEFAULT_RPC_RECV_TIMEOUT_MS, sizeof(DEFAULT_RPC_RECV_TIMEOUT_MS));
+ if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
+ return false;
+ }
- if (address.empty())
- address = "*";
- if (port.empty())
- port = "*";
- std::string bind_address = addr_prefix + address + std::string(":") + port;
- rep_socket->bind(bind_address.c_str());
+ static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
+ if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
+ return false;
}
- catch (const std::exception& e)
+
+ if (address.empty())
+ address = "*";
+ if (port.empty())
+ port = "*";
+
+ std::string bind_address = "tcp://";
+ bind_address.append(address.data(), address.size());
+ bind_address += ":";
+ bind_address.append(port.data(), port.size());
+
+ if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
{
- MERROR(std::string("Error creating ZMQ Socket: ") + e.what());
+ MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
return false;
}
return true;
@@ -120,22 +148,16 @@ bool ZmqServer::addTCPSocket(std::string address, std::string port)
void ZmqServer::run()
{
- running = true;
run_thread = boost::thread(boost::bind(&ZmqServer::serve, this));
}
void ZmqServer::stop()
{
- if (!running) return;
-
- stop_signal = true;
+ if (!run_thread.joinable())
+ return;
- run_thread.interrupt();
+ context.reset(); // destroying context terminates all calls
run_thread.join();
-
- running = false;
-
- return;
}