aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluigi1111 <luigi1111w@gmail.com>2019-09-14 13:25:14 -0500
committerluigi1111 <luigi1111w@gmail.com>2019-09-14 13:25:14 -0500
commitd663e1e3dbda1f12e2b0ea9ed74cfba2a953216e (patch)
treee5102407744d67a3cd4173840ffb728b880a55ee
parentMerge pull request #5899 (diff)
parentDropping cppzmq dependency; adding some zmq utils (diff)
downloadmonero-d663e1e3dbda1f12e2b0ea9ed74cfba2a953216e.tar.xz
Merge pull request #5818
f91a06c Dropping cppzmq dependency; adding some zmq utils (vtnerd)
-rw-r--r--CMakeLists.txt4
-rw-r--r--README.md2
-rw-r--r--src/net/CMakeLists.txt6
-rw-r--r--src/net/zmq.cpp188
-rw-r--r--src/net/zmq.h136
-rw-r--r--src/rpc/CMakeLists.txt2
-rw-r--r--src/rpc/zmq_server.cpp136
-rw-r--r--src/rpc/zmq_server.h20
-rw-r--r--tests/unit_tests/CMakeLists.txt3
-rw-r--r--tests/unit_tests/net.cpp130
10 files changed, 550 insertions, 77 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e458d8ead..d5bf7af62 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -979,14 +979,14 @@ if(CMAKE_C_COMPILER_ID STREQUAL "Clang" AND ARCH_WIDTH EQUAL "32" AND NOT IOS AN
endif()
endif()
-find_path(ZMQ_INCLUDE_PATH zmq.hpp)
+find_path(ZMQ_INCLUDE_PATH zmq.h)
find_library(ZMQ_LIB zmq)
find_library(PGM_LIBRARY pgm)
find_library(NORM_LIBRARY norm)
find_library(SODIUM_LIBRARY sodium)
if(NOT ZMQ_INCLUDE_PATH)
- message(FATAL_ERROR "Could not find required header zmq.hpp")
+ message(FATAL_ERROR "Could not find required header zmq.h")
endif()
if(NOT ZMQ_LIB)
message(FATAL_ERROR "Could not find required libzmq")
diff --git a/README.md b/README.md
index 322f6e783..0f977ca2e 100644
--- a/README.md
+++ b/README.md
@@ -181,7 +181,7 @@ library archives (`.a`).
| pkg-config | any | NO | `pkg-config` | `base-devel` | `pkgconf` | NO | |
| Boost | 1.58 | NO | `libboost-all-dev` | `boost` | `boost-devel` | NO | C++ libraries |
| OpenSSL | basically any | NO | `libssl-dev` | `openssl` | `openssl-devel` | NO | sha256 sum |
-| libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `cppzmq-devel` | NO | ZeroMQ library |
+| libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `zeromq-devel` | NO | ZeroMQ library |
| OpenPGM | ? | NO | `libpgm-dev` | `libpgm` | `openpgm-devel` | NO | For ZeroMQ |
| libnorm[2] | ? | NO | `libnorm-dev` | | ` | YES | For ZeroMQ |
| libunbound | 1.4.16 | YES | `libunbound-dev` | `unbound` | `unbound-devel` | NO | DNS resolver |
diff --git a/src/net/CMakeLists.txt b/src/net/CMakeLists.txt
index 24b707f77..339587ffa 100644
--- a/src/net/CMakeLists.txt
+++ b/src/net/CMakeLists.txt
@@ -26,8 +26,10 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp socks_connect.cpp tor_address.cpp)
-set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h tor_address.h)
+set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp
+ socks_connect.cpp tor_address.cpp zmq.cpp)
+set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h
+ tor_address.h zmq.h)
monero_add_library(net ${net_sources} ${net_headers})
target_link_libraries(net common epee ${Boost_ASIO_LIBRARY})
diff --git a/src/net/zmq.cpp b/src/net/zmq.cpp
new file mode 100644
index 000000000..d02a22983
--- /dev/null
+++ b/src/net/zmq.cpp
@@ -0,0 +1,188 @@
+// Copyright (c) 2019, The Monero Project
+//
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification, are
+// permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of
+// conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list
+// of conditions and the following disclaimer in the documentation and/or other
+// materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be
+// used to endorse or promote products derived from this software without specific
+// prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#include "net/zmq.h"
+
+#include <cassert>
+#include <cerrno>
+#include <limits>
+#include <utility>
+
+namespace net
+{
+namespace zmq
+{
+ const std::error_category& error_category() noexcept
+ {
+ struct category final : std::error_category
+ {
+ virtual const char* name() const noexcept override final
+ {
+ return "error::error_category()";
+ }
+
+ virtual std::string message(int value) const override final
+ {
+ char const* const msg = zmq_strerror(value);
+ if (msg)
+ return msg;
+ return "zmq_strerror failure";
+ }
+
+ virtual std::error_condition default_error_condition(int value) const noexcept override final
+ {
+ // maps specific errors to generic `std::errc` cases.
+ switch (value)
+ {
+ case EFSM:
+ case ETERM:
+ break;
+ default:
+ /* zmq is using cerrno errors. C++ spec indicates that
+ `std::errc` values must be identical to the cerrno value.
+ So just map every zmq specific error to the generic errc
+ equivalent. zmq extensions must be in the switch or they
+ map to a non-existent errc enum value. */
+ return std::errc(value);
+ }
+ return std::error_condition{value, *this};
+ }
+
+ };
+ static const category instance{};
+ return instance;
+ }
+
+ void terminate::call(void* ptr) noexcept
+ {
+ assert(ptr != nullptr); // see header
+ while (zmq_term(ptr))
+ {
+ if (zmq_errno() != EINTR)
+ break;
+ }
+ }
+
+ namespace
+ {
+ //! RAII wrapper for `zmq_msg_t`.
+ class message
+ {
+ zmq_msg_t handle_;
+
+ public:
+ message() noexcept
+ : handle_()
+ {
+ zmq_msg_init(handle());
+ }
+
+ message(message&& rhs) = delete;
+ message(const message& rhs) = delete;
+ message& operator=(message&& rhs) = delete;
+ message& operator=(const message& rhs) = delete;
+
+ ~message() noexcept
+ {
+ zmq_msg_close(handle());
+ }
+
+ zmq_msg_t* handle() noexcept
+ {
+ return std::addressof(handle_);
+ }
+
+ const char* data() noexcept
+ {
+ return static_cast<const char*>(zmq_msg_data(handle()));
+ }
+
+ std::size_t size() noexcept
+ {
+ return zmq_msg_size(handle());
+ }
+ };
+
+ struct do_receive
+ {
+ /* ZMQ documentation states that message parts are atomic - either
+ all are received or none are. Looking through ZMQ code and
+ Github discussions indicates that after part 1 is returned,
+ `EAGAIN` cannot be returned to meet these guarantees. Unit tests
+ verify (for the `inproc://` case) that this is the behavior.
+ Therefore, read errors after the first part are treated as a
+ failure for the entire message (probably `ETERM`). */
+ int operator()(std::string& payload, void* const socket, const int flags) const
+ {
+ static constexpr const int max_out = std::numeric_limits<int>::max();
+ const std::string::size_type initial = payload.size();
+ message part{};
+ for (;;)
+ {
+ int last = 0;
+ if ((last = zmq_msg_recv(part.handle(), socket, flags)) < 0)
+ return last;
+
+ payload.append(part.data(), part.size());
+ if (!zmq_msg_more(part.handle()))
+ break;
+ }
+ const std::string::size_type added = payload.size() - initial;
+ return unsigned(max_out) < added ? max_out : int(added);
+ }
+ };
+
+ template<typename F, typename... T>
+ expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...)))
+ {
+ for (;;)
+ {
+ if (0 <= op(args...))
+ return success();
+
+ const int error = zmq_errno();
+ if (error != EINTR)
+ return make_error_code(error);
+ }
+ }
+ } // anonymous
+
+ expect<std::string> receive(void* const socket, const int flags)
+ {
+ std::string payload{};
+ MONERO_CHECK(retry_op(do_receive{}, payload, socket, flags));
+ return {std::move(payload)};
+ }
+
+ expect<void> send(const epee::span<const std::uint8_t> payload, void* const socket, const int flags) noexcept
+ {
+ return retry_op(zmq_send, socket, payload.data(), payload.size(), flags);
+ }
+} // zmq
+} // net
+
diff --git a/src/net/zmq.h b/src/net/zmq.h
new file mode 100644
index 000000000..c6a7fd743
--- /dev/null
+++ b/src/net/zmq.h
@@ -0,0 +1,136 @@
+// Copyright (c) 2019, The Monero Project
+//
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification, are
+// permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of
+// conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list
+// of conditions and the following disclaimer in the documentation and/or other
+// materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be
+// used to endorse or promote products derived from this software without specific
+// prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#include <memory>
+#include <string>
+#include <system_error>
+#include <zmq.h>
+
+#include "common/expect.h"
+#include "span.h"
+
+//! If the expression is less than 0, return the current ZMQ error code.
+#define MONERO_ZMQ_CHECK(...) \
+ do \
+ { \
+ if (( __VA_ARGS__ ) < 0) \
+ return {::net::zmq::get_error_code()}; \
+ } while (0)
+
+//! Print a message followed by the current ZMQ error message.
+#define MONERO_LOG_ZMQ_ERROR(...) \
+ do \
+ { \
+ MERROR( __VA_ARGS__ << ": " << ::net::zmq::get_error_code().message()); \
+ } while (0)
+
+//! Throw an exception with a custom `msg`, current ZMQ error code, filename, and line number.
+#define MONERO_ZMQ_THROW(msg) \
+ MONERO_THROW( ::net::zmq::get_error_code(), msg )
+
+namespace net
+{
+namespace zmq
+{
+ //! \return Category for ZMQ errors.
+ const std::error_category& error_category() noexcept;
+
+ //! \return `code` (usally from zmq_errno()`) using `net::zmq::error_category()`.
+ inline std::error_code make_error_code(int code) noexcept
+ {
+ return std::error_code{code, error_category()};
+ }
+
+ //! \return Error from `zmq_errno()` using `net::zmq::error_category()`.
+ inline std::error_code get_error_code() noexcept
+ {
+ return make_error_code(zmq_errno());
+ }
+
+ //! Calls `zmq_term`
+ class terminate
+ {
+ static void call(void* ptr) noexcept;
+ public:
+ void operator()(void* ptr) const noexcept
+ {
+ if (ptr)
+ call(ptr);
+ }
+ };
+
+ //! Calls `zmq_close`
+ struct close
+ {
+ void operator()(void* ptr) const noexcept
+ {
+ if (ptr)
+ zmq_close(ptr);
+ }
+ };
+
+ //! Unique ZMQ context handle, calls `zmq_term` on destruction.
+ using context = std::unique_ptr<void, terminate>;
+
+ //! Unique ZMQ socket handle, calls `zmq_close` on destruction.
+ using socket = std::unique_ptr<void, close>;
+
+ /*! Read all parts of the next message on `socket`. Blocks until the entire
+ next message (all parts) are read, or until `zmq_term` is called on the
+ `zmq_context` associated with `socket`. If the context is terminated,
+ `make_error_code(ETERM)` is returned.
+
+ \note This will automatically retry on `EINTR`, so exiting on
+ interrupts requires context termination.
+ \note If non-blocking behavior is requested on `socket` or by `flags`,
+ then `net::zmq::make_error_code(EAGAIN)` will be returned if this
+ would block.
+
+ \param socket Handle created with `zmq_socket`.
+ \param flags See `zmq_msg_read` for possible flags.
+ \return Message payload read from `socket` or ZMQ error. */
+ expect<std::string> receive(void* socket, int flags = 0);
+
+ /*! Sends `payload` on `socket`. Blocks until the entire message is queued
+ for sending, or until `zmq_term` is called on the `zmq_context`
+ associated with `socket`. If the context is terminated,
+ `make_error_code(ETERM)` is returned.
+
+ \note This will automatically retry on `EINTR`, so exiting on
+ interrupts requires context termination.
+ \note If non-blocking behavior is requested on `socket` or by `flags`,
+ then `net::zmq::make_error_code(EAGAIN)` will be returned if this
+ would block.
+
+ \param payload sent as one message on `socket`.
+ \param socket Handle created with `zmq_socket`.
+ \param flags See `zmq_send` for possible flags.
+ \return `success()` if sent, otherwise ZMQ error. */
+ expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept;
+} // zmq
+} // net
diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt
index d98670e05..116e7f568 100644
--- a/src/rpc/CMakeLists.txt
+++ b/src/rpc/CMakeLists.txt
@@ -26,6 +26,8 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+include_directories(SYSTEM ${ZMQ_INCLUDE_PATH})
+
set(rpc_base_sources
rpc_args.cpp)
diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp
index 668a2e5cd..1ee55673e 100644
--- a/src/rpc/zmq_server.cpp
+++ b/src/rpc/zmq_server.cpp
@@ -28,18 +28,29 @@
#include "zmq_server.h"
+#include <chrono>
+#include <cstdint>
+#include <system_error>
+
namespace cryptonote
{
+namespace
+{
+ constexpr const int num_zmq_threads = 1;
+ constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
+ constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
+}
+
namespace rpc
{
ZmqServer::ZmqServer(RpcHandler& h) :
handler(h),
- stop_signal(false),
- running(false),
- context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable
+ context(zmq_init(num_zmq_threads))
{
+ if (!context)
+ MONERO_ZMQ_THROW("Unable to create ZMQ context");
}
ZmqServer::~ZmqServer()
@@ -48,71 +59,88 @@ ZmqServer::~ZmqServer()
void ZmqServer::serve()
{
-
- while (1)
+ try
{
- try
+ // socket must close before `zmq_term` will exit.
+ const net::zmq::socket socket = std::move(rep_socket);
+ if (!socket)
{
- zmq::message_t message;
-
- if (!rep_socket)
- {
- throw std::runtime_error("ZMQ RPC server reply socket is null");
- }
- while (rep_socket->recv(&message, 0))
- {
- std::string message_string(reinterpret_cast<const char *>(message.data()), message.size());
-
- MDEBUG(std::string("Received RPC request: \"") + message_string + "\"");
-
- std::string response = handler.handle(message_string);
-
- zmq::message_t reply(response.size());
- memcpy((void *) reply.data(), response.c_str(), response.size());
-
- rep_socket->send(reply);
- MDEBUG(std::string("Sent RPC reply: \"") + response + "\"");
-
- }
- }
- catch (const boost::thread_interrupted& e)
- {
- MDEBUG("ZMQ Server thread interrupted.");
+ MERROR("ZMQ RPC server reply socket is null");
+ return;
}
- catch (const zmq::error_t& e)
+
+ while (1)
{
- MERROR(std::string("ZMQ error: ") + e.what());
+ const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
+ MDEBUG("Received RPC request: \"" << message << "\"");
+ const std::string& response = handler.handle(message);
+
+ MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
+ MDEBUG("Sent RPC reply: \"" << response << "\"");
}
- boost::this_thread::interruption_point();
+ }
+ catch (const std::system_error& e)
+ {
+ if (e.code() != net::zmq::make_error_code(ETERM))
+ MERROR("ZMQ RPC Server Error: " << e.what());
+ }
+ catch (const std::exception& e)
+ {
+ MERROR("ZMQ RPC Server Error: " << e.what());
+ }
+ catch (...)
+ {
+ MERROR("Unknown error in ZMQ RPC server");
}
}
-bool ZmqServer::addIPCSocket(std::string address, std::string port)
+bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
{
MERROR("ZmqServer::addIPCSocket not yet implemented!");
return false;
}
-bool ZmqServer::addTCPSocket(std::string address, std::string port)
+bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
{
- try
+ if (!context)
{
- std::string addr_prefix("tcp://");
+ MERROR("ZMQ RPC Server already shutdown");
+ return false;
+ }
- rep_socket.reset(new zmq::socket_t(context, ZMQ_REP));
+ rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
+ if (!rep_socket)
+ {
+ MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
+ return false;
+ }
- rep_socket->setsockopt(ZMQ_RCVTIMEO, &DEFAULT_RPC_RECV_TIMEOUT_MS, sizeof(DEFAULT_RPC_RECV_TIMEOUT_MS));
+ if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
+ return false;
+ }
- if (address.empty())
- address = "*";
- if (port.empty())
- port = "*";
- std::string bind_address = addr_prefix + address + std::string(":") + port;
- rep_socket->bind(bind_address.c_str());
+ static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
+ if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
+ return false;
}
- catch (const std::exception& e)
+
+ if (address.empty())
+ address = "*";
+ if (port.empty())
+ port = "*";
+
+ std::string bind_address = "tcp://";
+ bind_address.append(address.data(), address.size());
+ bind_address += ":";
+ bind_address.append(port.data(), port.size());
+
+ if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
{
- MERROR(std::string("Error creating ZMQ Socket: ") + e.what());
+ MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
return false;
}
return true;
@@ -120,22 +148,16 @@ bool ZmqServer::addTCPSocket(std::string address, std::string port)
void ZmqServer::run()
{
- running = true;
run_thread = boost::thread(boost::bind(&ZmqServer::serve, this));
}
void ZmqServer::stop()
{
- if (!running) return;
-
- stop_signal = true;
+ if (!run_thread.joinable())
+ return;
- run_thread.interrupt();
+ context.reset(); // destroying context terminates all calls
run_thread.join();
-
- running = false;
-
- return;
}
diff --git a/src/rpc/zmq_server.h b/src/rpc/zmq_server.h
index 1b1e4c7cf..ce7892dab 100644
--- a/src/rpc/zmq_server.h
+++ b/src/rpc/zmq_server.h
@@ -29,12 +29,10 @@
#pragma once
#include <boost/thread/thread.hpp>
-#include <zmq.hpp>
-#include <string>
-#include <memory>
+#include <boost/utility/string_ref.hpp>
#include "common/command_line.h"
-
+#include "net/zmq.h"
#include "rpc_handler.h"
namespace cryptonote
@@ -43,9 +41,6 @@ namespace cryptonote
namespace rpc
{
-static constexpr int DEFAULT_NUM_ZMQ_THREADS = 1;
-static constexpr int DEFAULT_RPC_RECV_TIMEOUT_MS = 1000;
-
class ZmqServer
{
public:
@@ -58,8 +53,8 @@ class ZmqServer
void serve();
- bool addIPCSocket(std::string address, std::string port);
- bool addTCPSocket(std::string address, std::string port);
+ bool addIPCSocket(boost::string_ref address, boost::string_ref port);
+ bool addTCPSocket(boost::string_ref address, boost::string_ref port);
void run();
void stop();
@@ -67,14 +62,11 @@ class ZmqServer
private:
RpcHandler& handler;
- volatile bool stop_signal;
- volatile bool running;
-
- zmq::context_t context;
+ net::zmq::context context;
boost::thread run_thread;
- std::unique_ptr<zmq::socket_t> rep_socket;
+ net::zmq::socket rep_socket;
};
diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt
index a5d179040..cac1fa943 100644
--- a/tests/unit_tests/CMakeLists.txt
+++ b/tests/unit_tests/CMakeLists.txt
@@ -117,7 +117,8 @@ target_link_libraries(unit_tests
${Boost_THREAD_LIBRARY}
${GTEST_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT}
- ${EXTRA_LIBRARIES})
+ ${EXTRA_LIBRARIES}
+ ${ZMQ_LIB})
set_property(TARGET unit_tests
PROPERTY
FOLDER "tests")
diff --git a/tests/unit_tests/net.cpp b/tests/unit_tests/net.cpp
index 7e6ba4f03..253280d4d 100644
--- a/tests/unit_tests/net.cpp
+++ b/tests/unit_tests/net.cpp
@@ -40,6 +40,7 @@
#include <boost/range/adaptor/sliced.hpp>
#include <boost/range/combine.hpp>
#include <boost/system/error_code.hpp>
+#include <boost/thread/scoped_thread.hpp>
#include <boost/thread/thread.hpp>
#include <boost/uuid/nil_generator.hpp>
#include <boost/uuid/random_generator.hpp>
@@ -59,6 +60,7 @@
#include "net/socks_connect.h"
#include "net/parse.h"
#include "net/tor_address.h"
+#include "net/zmq.h"
#include "p2p/net_peerlist_boost_serialization.h"
#include "serialization/keyvalue_serialization.h"
#include "storages/portable_storage.h"
@@ -1259,3 +1261,131 @@ TEST(dandelionpp_map, dropped_all_connections)
EXPECT_EQ(3u, entry.second);
}
}
+
+TEST(zmq, error_codes)
+{
+ EXPECT_EQ(
+ std::addressof(net::zmq::error_category()),
+ std::addressof(net::zmq::make_error_code(0).category())
+ );
+ EXPECT_EQ(
+ std::make_error_condition(std::errc::not_a_socket),
+ net::zmq::make_error_code(ENOTSOCK)
+ );
+
+ EXPECT_TRUE(
+ []() -> expect<void>
+ {
+ MONERO_ZMQ_CHECK(zmq_msg_send(nullptr, nullptr, 0));
+ return success();
+ }().matches(std::errc::not_a_socket)
+ );
+
+ bool thrown = false;
+ try
+ {
+ MONERO_ZMQ_THROW("stuff");
+ }
+ catch (const std::system_error& e)
+ {
+ thrown = true;
+ EXPECT_EQ(std::make_error_condition(std::errc::not_a_socket), e.code());
+ }
+ EXPECT_TRUE(thrown);
+}
+
+TEST(zmq, read_write)
+{
+ net::zmq::context context{zmq_init(1)};
+ ASSERT_NE(nullptr, context);
+
+ net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
+ net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
+ ASSERT_NE(nullptr, send_socket);
+ ASSERT_NE(nullptr, recv_socket);
+
+ ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
+ ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
+
+ std::string message;
+ message.resize(1024);
+ crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
+
+ ASSERT_TRUE(bool(net::zmq::send(epee::strspan<std::uint8_t>(message), send_socket.get())));
+
+ const expect<std::string> received = net::zmq::receive(recv_socket.get());
+ ASSERT_TRUE(bool(received));
+ EXPECT_EQ(message, *received);
+}
+
+TEST(zmq, read_write_multipart)
+{
+ net::zmq::context context{zmq_init(1)};
+ ASSERT_NE(nullptr, context);
+
+ net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
+ net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
+ ASSERT_NE(nullptr, send_socket);
+ ASSERT_NE(nullptr, recv_socket);
+
+ ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
+ ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
+
+ std::string message;
+ message.resize(999);
+ crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
+
+ for (unsigned i = 0; i < 3; ++i)
+ {
+ const expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
+ ASSERT_FALSE(bool(received));
+ EXPECT_EQ(net::zmq::make_error_code(EAGAIN), received.error());
+
+ const epee::span<const std::uint8_t> bytes{
+ reinterpret_cast<const std::uint8_t*>(std::addressof(message[0])) + (i * 333), 333
+ };
+ ASSERT_TRUE(bool(net::zmq::send(bytes, send_socket.get(), (i == 2 ? 0 : ZMQ_SNDMORE))));
+ }
+
+ const expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
+ ASSERT_TRUE(bool(received));
+ EXPECT_EQ(message, *received);
+}
+
+TEST(zmq, read_write_termination)
+{
+ net::zmq::context context{zmq_init(1)};
+ ASSERT_NE(nullptr, context);
+
+ // must be declared before sockets and after context
+ boost::scoped_thread<> thread{};
+
+ net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
+ net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
+ ASSERT_NE(nullptr, send_socket);
+ ASSERT_NE(nullptr, recv_socket);
+
+ ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
+ ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
+
+ std::string message;
+ message.resize(1024);
+ crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
+
+ ASSERT_TRUE(bool(net::zmq::send(epee::strspan<std::uint8_t>(message), send_socket.get(), ZMQ_SNDMORE)));
+
+ expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
+ ASSERT_FALSE(bool(received));
+ EXPECT_EQ(net::zmq::make_error_code(EAGAIN), received.error());
+
+ thread = boost::scoped_thread<>{
+ boost::thread{
+ [&context] () { context.reset(); }
+ }
+ };
+
+ received = net::zmq::receive(recv_socket.get());
+ ASSERT_FALSE(bool(received));
+ EXPECT_EQ(net::zmq::make_error_code(ETERM), received.error());
+}
+