diff options
Diffstat (limited to '')
-rw-r--r-- | src/net/CMakeLists.txt | 6 | ||||
-rw-r--r-- | src/net/zmq.cpp | 188 | ||||
-rw-r--r-- | src/net/zmq.h | 136 |
3 files changed, 328 insertions, 2 deletions
diff --git a/src/net/CMakeLists.txt b/src/net/CMakeLists.txt index 24b707f77..339587ffa 100644 --- a/src/net/CMakeLists.txt +++ b/src/net/CMakeLists.txt @@ -26,8 +26,10 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp socks_connect.cpp tor_address.cpp) -set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h tor_address.h) +set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp + socks_connect.cpp tor_address.cpp zmq.cpp) +set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h + tor_address.h zmq.h) monero_add_library(net ${net_sources} ${net_headers}) target_link_libraries(net common epee ${Boost_ASIO_LIBRARY}) diff --git a/src/net/zmq.cpp b/src/net/zmq.cpp new file mode 100644 index 000000000..d02a22983 --- /dev/null +++ b/src/net/zmq.cpp @@ -0,0 +1,188 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "net/zmq.h" + +#include <cassert> +#include <cerrno> +#include <limits> +#include <utility> + +namespace net +{ +namespace zmq +{ + const std::error_category& error_category() noexcept + { + struct category final : std::error_category + { + virtual const char* name() const noexcept override final + { + return "error::error_category()"; + } + + virtual std::string message(int value) const override final + { + char const* const msg = zmq_strerror(value); + if (msg) + return msg; + return "zmq_strerror failure"; + } + + virtual std::error_condition default_error_condition(int value) const noexcept override final + { + // maps specific errors to generic `std::errc` cases. + switch (value) + { + case EFSM: + case ETERM: + break; + default: + /* zmq is using cerrno errors. C++ spec indicates that + `std::errc` values must be identical to the cerrno value. + So just map every zmq specific error to the generic errc + equivalent. zmq extensions must be in the switch or they + map to a non-existent errc enum value. */ + return std::errc(value); + } + return std::error_condition{value, *this}; + } + + }; + static const category instance{}; + return instance; + } + + void terminate::call(void* ptr) noexcept + { + assert(ptr != nullptr); // see header + while (zmq_term(ptr)) + { + if (zmq_errno() != EINTR) + break; + } + } + + namespace + { + //! RAII wrapper for `zmq_msg_t`. + class message + { + zmq_msg_t handle_; + + public: + message() noexcept + : handle_() + { + zmq_msg_init(handle()); + } + + message(message&& rhs) = delete; + message(const message& rhs) = delete; + message& operator=(message&& rhs) = delete; + message& operator=(const message& rhs) = delete; + + ~message() noexcept + { + zmq_msg_close(handle()); + } + + zmq_msg_t* handle() noexcept + { + return std::addressof(handle_); + } + + const char* data() noexcept + { + return static_cast<const char*>(zmq_msg_data(handle())); + } + + std::size_t size() noexcept + { + return zmq_msg_size(handle()); + } + }; + + struct do_receive + { + /* ZMQ documentation states that message parts are atomic - either + all are received or none are. Looking through ZMQ code and + Github discussions indicates that after part 1 is returned, + `EAGAIN` cannot be returned to meet these guarantees. Unit tests + verify (for the `inproc://` case) that this is the behavior. + Therefore, read errors after the first part are treated as a + failure for the entire message (probably `ETERM`). */ + int operator()(std::string& payload, void* const socket, const int flags) const + { + static constexpr const int max_out = std::numeric_limits<int>::max(); + const std::string::size_type initial = payload.size(); + message part{}; + for (;;) + { + int last = 0; + if ((last = zmq_msg_recv(part.handle(), socket, flags)) < 0) + return last; + + payload.append(part.data(), part.size()); + if (!zmq_msg_more(part.handle())) + break; + } + const std::string::size_type added = payload.size() - initial; + return unsigned(max_out) < added ? max_out : int(added); + } + }; + + template<typename F, typename... T> + expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...))) + { + for (;;) + { + if (0 <= op(args...)) + return success(); + + const int error = zmq_errno(); + if (error != EINTR) + return make_error_code(error); + } + } + } // anonymous + + expect<std::string> receive(void* const socket, const int flags) + { + std::string payload{}; + MONERO_CHECK(retry_op(do_receive{}, payload, socket, flags)); + return {std::move(payload)}; + } + + expect<void> send(const epee::span<const std::uint8_t> payload, void* const socket, const int flags) noexcept + { + return retry_op(zmq_send, socket, payload.data(), payload.size(), flags); + } +} // zmq +} // net + diff --git a/src/net/zmq.h b/src/net/zmq.h new file mode 100644 index 000000000..c6a7fd743 --- /dev/null +++ b/src/net/zmq.h @@ -0,0 +1,136 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include <memory> +#include <string> +#include <system_error> +#include <zmq.h> + +#include "common/expect.h" +#include "span.h" + +//! If the expression is less than 0, return the current ZMQ error code. +#define MONERO_ZMQ_CHECK(...) \ + do \ + { \ + if (( __VA_ARGS__ ) < 0) \ + return {::net::zmq::get_error_code()}; \ + } while (0) + +//! Print a message followed by the current ZMQ error message. +#define MONERO_LOG_ZMQ_ERROR(...) \ + do \ + { \ + MERROR( __VA_ARGS__ << ": " << ::net::zmq::get_error_code().message()); \ + } while (0) + +//! Throw an exception with a custom `msg`, current ZMQ error code, filename, and line number. +#define MONERO_ZMQ_THROW(msg) \ + MONERO_THROW( ::net::zmq::get_error_code(), msg ) + +namespace net +{ +namespace zmq +{ + //! \return Category for ZMQ errors. + const std::error_category& error_category() noexcept; + + //! \return `code` (usally from zmq_errno()`) using `net::zmq::error_category()`. + inline std::error_code make_error_code(int code) noexcept + { + return std::error_code{code, error_category()}; + } + + //! \return Error from `zmq_errno()` using `net::zmq::error_category()`. + inline std::error_code get_error_code() noexcept + { + return make_error_code(zmq_errno()); + } + + //! Calls `zmq_term` + class terminate + { + static void call(void* ptr) noexcept; + public: + void operator()(void* ptr) const noexcept + { + if (ptr) + call(ptr); + } + }; + + //! Calls `zmq_close` + struct close + { + void operator()(void* ptr) const noexcept + { + if (ptr) + zmq_close(ptr); + } + }; + + //! Unique ZMQ context handle, calls `zmq_term` on destruction. + using context = std::unique_ptr<void, terminate>; + + //! Unique ZMQ socket handle, calls `zmq_close` on destruction. + using socket = std::unique_ptr<void, close>; + + /*! Read all parts of the next message on `socket`. Blocks until the entire + next message (all parts) are read, or until `zmq_term` is called on the + `zmq_context` associated with `socket`. If the context is terminated, + `make_error_code(ETERM)` is returned. + + \note This will automatically retry on `EINTR`, so exiting on + interrupts requires context termination. + \note If non-blocking behavior is requested on `socket` or by `flags`, + then `net::zmq::make_error_code(EAGAIN)` will be returned if this + would block. + + \param socket Handle created with `zmq_socket`. + \param flags See `zmq_msg_read` for possible flags. + \return Message payload read from `socket` or ZMQ error. */ + expect<std::string> receive(void* socket, int flags = 0); + + /*! Sends `payload` on `socket`. Blocks until the entire message is queued + for sending, or until `zmq_term` is called on the `zmq_context` + associated with `socket`. If the context is terminated, + `make_error_code(ETERM)` is returned. + + \note This will automatically retry on `EINTR`, so exiting on + interrupts requires context termination. + \note If non-blocking behavior is requested on `socket` or by `flags`, + then `net::zmq::make_error_code(EAGAIN)` will be returned if this + would block. + + \param payload sent as one message on `socket`. + \param socket Handle created with `zmq_socket`. + \param flags See `zmq_send` for possible flags. + \return `success()` if sent, otherwise ZMQ error. */ + expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept; +} // zmq +} // net |