diff options
Diffstat (limited to 'src/rpc/zmq_server.cpp')
-rw-r--r-- | src/rpc/zmq_server.cpp | 51 |
1 files changed, 23 insertions, 28 deletions
diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp index 8e1e47b38..afdff2328 100644 --- a/src/rpc/zmq_server.cpp +++ b/src/rpc/zmq_server.cpp @@ -45,10 +45,6 @@ ZmqServer::ZmqServer(RpcHandler& h) : ZmqServer::~ZmqServer() { - for (zmq::socket_t* socket : sockets) - { - delete socket; - } } void ZmqServer::serve() @@ -58,31 +54,36 @@ void ZmqServer::serve() { try { - for (zmq::socket_t* socket : sockets) - { - zmq::message_t message; + zmq::message_t message; - while (socket->recv(&message)) - { - std::string message_string(reinterpret_cast<const char *>(message.data()), message.size()); + if (!rep_socket) + { + throw std::runtime_error("ZMQ RPC server reply socket is null"); + } + while (rep_socket->recv(&message)) + { + std::string message_string(reinterpret_cast<const char *>(message.data()), message.size()); - MDEBUG(std::string("Received RPC request: \"") + message_string + "\""); + MDEBUG(std::string("Received RPC request: \"") + message_string + "\""); - std::string response = handler.handle(message_string); + std::string response = handler.handle(message_string); - zmq::message_t reply(response.size()); - memcpy((void *) reply.data(), response.c_str(), response.size()); + zmq::message_t reply(response.size()); + memcpy((void *) reply.data(), response.c_str(), response.size()); - socket->send(reply); - MDEBUG(std::string("Sent RPC reply: \"") + response + "\""); - } + rep_socket->send(reply); + MDEBUG(std::string("Sent RPC reply: \"") + response + "\""); } } - catch (boost::thread_interrupted& e) + catch (const boost::thread_interrupted& e) { MDEBUG("ZMQ Server thread interrupted."); } + catch (const zmq::error_t& e) + { + MERROR(std::string("ZMQ error: ") + e.what()); + } boost::this_thread::interruption_point(); } } @@ -95,26 +96,20 @@ bool ZmqServer::addIPCSocket(std::string address, std::string port) bool ZmqServer::addTCPSocket(std::string address, std::string port) { - zmq::socket_t *new_socket = nullptr; try { std::string addr_prefix("tcp://"); - new_socket = new zmq::socket_t(context, ZMQ_REP); + rep_socket.reset(new zmq::socket_t(context, ZMQ_REP)); - new_socket->setsockopt(ZMQ_RCVTIMEO, DEFAULT_RPC_RECV_TIMEOUT_MS); + rep_socket->setsockopt(ZMQ_RCVTIMEO, DEFAULT_RPC_RECV_TIMEOUT_MS); std::string bind_address = addr_prefix + address + std::string(":") + port; - new_socket->bind(bind_address.c_str()); - sockets.push_back(new_socket); + rep_socket->bind(bind_address.c_str()); } - catch (std::exception& e) + catch (const std::exception& e) { MERROR(std::string("Error creating ZMQ Socket: ") + e.what()); - if (new_socket) - { - delete new_socket; - } return false; } return true; |