diff options
author | luigi1111 <luigi1111w@gmail.com> | 2020-05-01 15:13:58 -0500 |
---|---|---|
committer | luigi1111 <luigi1111w@gmail.com> | 2020-05-01 15:13:58 -0500 |
commit | 3e21e591b8201fdb852822b1d30faa1fb97d870a (patch) | |
tree | 66f7e7af1eb010d5ed97320916b40abc58a36268 | |
parent | Merge pull request #6487 (diff) | |
parent | Use byte_slice for sending zmq messages - removes data copy within zmq (diff) | |
download | monero-3e21e591b8201fdb852822b1d30faa1fb97d870a.tar.xz |
Merge pull request #6350
da99157 Use byte_slice for sending zmq messages - removes data copy within zmq (vtnerd)
-rw-r--r-- | contrib/epee/include/byte_slice.h | 10 | ||||
-rw-r--r-- | contrib/epee/src/byte_slice.cpp | 19 | ||||
-rw-r--r-- | src/net/zmq.cpp | 18 | ||||
-rw-r--r-- | src/net/zmq.h | 24 | ||||
-rw-r--r-- | src/rpc/daemon_handler.cpp | 14 | ||||
-rw-r--r-- | src/rpc/daemon_handler.h | 3 | ||||
-rw-r--r-- | src/rpc/message.cpp | 14 | ||||
-rw-r--r-- | src/rpc/message.h | 11 | ||||
-rw-r--r-- | src/rpc/rpc_handler.h | 3 | ||||
-rw-r--r-- | src/rpc/zmq_server.cpp | 10 | ||||
-rw-r--r-- | tests/unit_tests/net.cpp | 39 |
11 files changed, 138 insertions, 27 deletions
diff --git a/contrib/epee/include/byte_slice.h b/contrib/epee/include/byte_slice.h index 1fbba101e..bd9452b11 100644 --- a/contrib/epee/include/byte_slice.h +++ b/contrib/epee/include/byte_slice.h @@ -42,7 +42,12 @@ namespace epee struct release_byte_slice { - void operator()(byte_slice_data*) const noexcept; + //! For use with `zmq_message_init_data`, use second arg for buffer pointer. + static void call(void*, void* ptr) noexcept; + void operator()(byte_slice_data* ptr) const noexcept + { + call(nullptr, ptr); + } }; /*! Inspired by slices in golang. Storage is thread-safe reference counted, @@ -140,6 +145,9 @@ namespace epee \throw std::out_of_range If `size() < end`. \return Slice starting at `data() + begin` of size `end - begin`. */ byte_slice get_slice(std::size_t begin, std::size_t end) const; + + //! \post `empty()` \return Ownership of ref-counted buffer. + std::unique_ptr<byte_slice_data, release_byte_slice> take_buffer() noexcept; }; } // epee diff --git a/contrib/epee/src/byte_slice.cpp b/contrib/epee/src/byte_slice.cpp index 216049e5b..3ad3b3e7e 100644 --- a/contrib/epee/src/byte_slice.cpp +++ b/contrib/epee/src/byte_slice.cpp @@ -49,12 +49,16 @@ namespace epee std::atomic<std::size_t> ref_count; }; - void release_byte_slice::operator()(byte_slice_data* ptr) const noexcept + void release_byte_slice::call(void*, void* ptr) noexcept { - if (ptr && --(ptr->ref_count) == 0) + if (ptr) { - ptr->~byte_slice_data(); - free(ptr); + byte_slice_data* self = static_cast<byte_slice_data*>(ptr); + if (--(self->ref_count) == 0) + { + self->~byte_slice_data(); + free(self); + } } } @@ -206,4 +210,11 @@ namespace epee return {}; return {storage_.get(), {portion_.begin() + begin, end - begin}}; } + + std::unique_ptr<byte_slice_data, release_byte_slice> byte_slice::take_buffer() noexcept + { + std::unique_ptr<byte_slice_data, release_byte_slice> out{std::move(storage_)}; + portion_ = nullptr; + return out; + } } // epee diff --git a/src/net/zmq.cpp b/src/net/zmq.cpp index d02a22983..7ea80b907 100644 --- a/src/net/zmq.cpp +++ b/src/net/zmq.cpp @@ -33,6 +33,8 @@ #include <limits> #include <utility> +#include "byte_slice.h" + namespace net { namespace zmq @@ -183,6 +185,22 @@ namespace zmq { return retry_op(zmq_send, socket, payload.data(), payload.size(), flags); } + + expect<void> send(epee::byte_slice&& payload, void* socket, int flags) noexcept + { + void* const data = const_cast<std::uint8_t*>(payload.data()); + const std::size_t size = payload.size(); + auto buffer = payload.take_buffer(); // clears `payload` from callee + + zmq_msg_t msg{}; + MONERO_ZMQ_CHECK(zmq_msg_init_data(std::addressof(msg), data, size, epee::release_byte_slice::call, buffer.get())); + buffer.release(); // zmq will now decrement byte_slice ref-count + + expect<void> sent = retry_op(zmq_msg_send, std::addressof(msg), socket, flags); + if (!sent) // beware if removing `noexcept` from this function - possible leak here + zmq_msg_close(std::addressof(msg)); + return sent; + } } // zmq } // net diff --git a/src/net/zmq.h b/src/net/zmq.h index c6a7fd743..65560b62e 100644 --- a/src/net/zmq.h +++ b/src/net/zmq.h @@ -53,6 +53,11 @@ #define MONERO_ZMQ_THROW(msg) \ MONERO_THROW( ::net::zmq::get_error_code(), msg ) +namespace epee +{ + class byte_slice; +} + namespace net { namespace zmq @@ -132,5 +137,24 @@ namespace zmq \param flags See `zmq_send` for possible flags. \return `success()` if sent, otherwise ZMQ error. */ expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept; + + /*! Sends `payload` on `socket`. Blocks until the entire message is queued + for sending, or until `zmq_term` is called on the `zmq_context` + associated with `socket`. If the context is terminated, + `make_error_code(ETERM)` is returned. + + \note This will automatically retry on `EINTR`, so exiting on + interrupts requires context termination. + \note If non-blocking behavior is requested on `socket` or by `flags`, + then `net::zmq::make_error_code(EAGAIN)` will be returned if this + would block. + + \param payload sent as one message on `socket`. + \param socket Handle created with `zmq_socket`. + \param flags See `zmq_msg_send` for possible flags. + + \post `payload.emtpy()` - ownership is transferred to zmq. + \return `success()` if sent, otherwise ZMQ error. */ + expect<void> send(epee::byte_slice&& payload, void* socket, int flags = 0) noexcept; } // zmq } // net diff --git a/src/rpc/daemon_handler.cpp b/src/rpc/daemon_handler.cpp index d05854e34..de0510fec 100644 --- a/src/rpc/daemon_handler.cpp +++ b/src/rpc/daemon_handler.cpp @@ -33,6 +33,7 @@ #include <stdexcept> #include <boost/uuid/nil_generator.hpp> +#include <boost/utility/string_ref.hpp> // likely included by daemon_handler.h's includes, // but including here for clarity #include "cryptonote_core/cryptonote_core.h" @@ -48,7 +49,7 @@ namespace rpc { namespace { - using handler_function = std::string(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& msg); + using handler_function = epee::byte_slice(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& msg); struct handler_map { const char* method_name; @@ -66,7 +67,7 @@ namespace rpc } template<typename Message> - std::string handle_message(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& parameters) + epee::byte_slice handle_message(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& parameters) { typename Message::Request request{}; request.fromJson(parameters); @@ -903,7 +904,7 @@ namespace rpc return true; } - std::string DaemonHandler::handle(const std::string& request) + epee::byte_slice DaemonHandler::handle(const std::string& request) { MDEBUG("Handling RPC request: " << request); @@ -916,8 +917,11 @@ namespace rpc if (matched_handler == std::end(handlers) || matched_handler->method_name != request_type) return BAD_REQUEST(request_type, req_full.getID()); - std::string response = matched_handler->call(*this, req_full.getID(), req_full.getMessage()); - MDEBUG("Returning RPC response: " << response); + epee::byte_slice response = matched_handler->call(*this, req_full.getID(), req_full.getMessage()); + + const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()}; + MDEBUG("Returning RPC response: " << response_view); + return response; } catch (const std::exception& e) diff --git a/src/rpc/daemon_handler.h b/src/rpc/daemon_handler.h index 61eac17f0..b797b1155 100644 --- a/src/rpc/daemon_handler.h +++ b/src/rpc/daemon_handler.h @@ -28,6 +28,7 @@ #pragma once +#include "byte_slice.h" #include "daemon_messages.h" #include "daemon_rpc_version.h" #include "rpc_handler.h" @@ -132,7 +133,7 @@ class DaemonHandler : public RpcHandler void handle(const GetOutputDistribution::Request& req, GetOutputDistribution::Response& res); - std::string handle(const std::string& request); + epee::byte_slice handle(const std::string& request) override final; private: diff --git a/src/rpc/message.cpp b/src/rpc/message.cpp index 5b6a1c05b..0d8983cb1 100644 --- a/src/rpc/message.cpp +++ b/src/rpc/message.cpp @@ -149,7 +149,7 @@ cryptonote::rpc::error FullMessage::getError() return err; } -std::string FullMessage::getRequest(const std::string& request, const Message& message, const unsigned id) +epee::byte_slice FullMessage::getRequest(const std::string& request, const Message& message, const unsigned id) { rapidjson::StringBuffer buffer; { @@ -172,11 +172,11 @@ std::string FullMessage::getRequest(const std::string& request, const Message& m if (!dest.IsComplete()) throw std::logic_error{"Invalid JSON tree generated"}; } - return std::string{buffer.GetString(), buffer.GetSize()}; + return epee::byte_slice{{buffer.GetString(), buffer.GetSize()}}; } -std::string FullMessage::getResponse(const Message& message, const rapidjson::Value& id) +epee::byte_slice FullMessage::getResponse(const Message& message, const rapidjson::Value& id) { rapidjson::StringBuffer buffer; { @@ -207,17 +207,17 @@ std::string FullMessage::getResponse(const Message& message, const rapidjson::Va if (!dest.IsComplete()) throw std::logic_error{"Invalid JSON tree generated"}; } - return std::string{buffer.GetString(), buffer.GetSize()}; + return epee::byte_slice{{buffer.GetString(), buffer.GetSize()}}; } // convenience functions for bad input -std::string BAD_REQUEST(const std::string& request) +epee::byte_slice BAD_REQUEST(const std::string& request) { rapidjson::Value invalid; return BAD_REQUEST(request, invalid); } -std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id) +epee::byte_slice BAD_REQUEST(const std::string& request, const rapidjson::Value& id) { Message fail; fail.status = Message::STATUS_BAD_REQUEST; @@ -225,7 +225,7 @@ std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id) return FullMessage::getResponse(fail, id); } -std::string BAD_JSON(const std::string& error_details) +epee::byte_slice BAD_JSON(const std::string& error_details) { rapidjson::Value invalid; Message fail; diff --git a/src/rpc/message.h b/src/rpc/message.h index 4cbc84fe4..8156e232c 100644 --- a/src/rpc/message.h +++ b/src/rpc/message.h @@ -33,6 +33,7 @@ #include <rapidjson/writer.h> #include <string> +#include "byte_slice.h" #include "rpc/message_data_structs.h" namespace cryptonote @@ -85,8 +86,8 @@ namespace rpc cryptonote::rpc::error getError(); - static std::string getRequest(const std::string& request, const Message& message, unsigned id); - static std::string getResponse(const Message& message, const rapidjson::Value& id); + static epee::byte_slice getRequest(const std::string& request, const Message& message, unsigned id); + static epee::byte_slice getResponse(const Message& message, const rapidjson::Value& id); private: FullMessage() = default; @@ -99,10 +100,10 @@ namespace rpc // convenience functions for bad input - std::string BAD_REQUEST(const std::string& request); - std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id); + epee::byte_slice BAD_REQUEST(const std::string& request); + epee::byte_slice BAD_REQUEST(const std::string& request, const rapidjson::Value& id); - std::string BAD_JSON(const std::string& error_details); + epee::byte_slice BAD_JSON(const std::string& error_details); } // namespace rpc diff --git a/src/rpc/rpc_handler.h b/src/rpc/rpc_handler.h index b81983d28..9a1c3fc12 100644 --- a/src/rpc/rpc_handler.h +++ b/src/rpc/rpc_handler.h @@ -32,6 +32,7 @@ #include <cstdint> #include <string> #include <vector> +#include "byte_slice.h" #include "crypto/hash.h" namespace cryptonote @@ -54,7 +55,7 @@ class RpcHandler RpcHandler() { } virtual ~RpcHandler() { } - virtual std::string handle(const std::string& request) = 0; + virtual epee::byte_slice handle(const std::string& request) = 0; static boost::optional<output_distribution_data> get_output_distribution(const std::function<bool(uint64_t, uint64_t, uint64_t, uint64_t&, std::vector<uint64_t>&, uint64_t&)> &f, uint64_t amount, uint64_t from_height, uint64_t to_height, const std::function<crypto::hash(uint64_t)> &get_hash, bool cumulative, uint64_t blockchain_height); diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp index 1ee55673e..91e4751d1 100644 --- a/src/rpc/zmq_server.cpp +++ b/src/rpc/zmq_server.cpp @@ -28,10 +28,13 @@ #include "zmq_server.h" +#include <boost/utility/string_ref.hpp> #include <chrono> #include <cstdint> #include <system_error> +#include "byte_slice.h" + namespace cryptonote { @@ -73,10 +76,11 @@ void ZmqServer::serve() { const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get())); MDEBUG("Received RPC request: \"" << message << "\""); - const std::string& response = handler.handle(message); + epee::byte_slice response = handler.handle(message); - MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get())); - MDEBUG("Sent RPC reply: \"" << response << "\""); + const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()}; + MDEBUG("Sending RPC reply: \"" << response_view << "\""); + MONERO_UNWRAP(net::zmq::send(std::move(response), socket.get())); } } catch (const std::system_error& e) diff --git a/tests/unit_tests/net.cpp b/tests/unit_tests/net.cpp index 36cb28ae0..f5aef4796 100644 --- a/tests/unit_tests/net.cpp +++ b/tests/unit_tests/net.cpp @@ -1702,6 +1702,45 @@ TEST(zmq, read_write) EXPECT_EQ(message, *received); } +TEST(zmq, read_write_slice) +{ + net::zmq::context context{zmq_init(1)}; + ASSERT_NE(nullptr, context); + + net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)}; + net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)}; + ASSERT_NE(nullptr, send_socket); + ASSERT_NE(nullptr, recv_socket); + + ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing")); + ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing")); + + std::string message; + message.resize(1024); + crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0]))); + + { + epee::byte_slice slice_message{{epee::strspan<std::uint8_t>(message)}}; + ASSERT_TRUE(bool(net::zmq::send(std::move(slice_message), send_socket.get()))); + EXPECT_TRUE(slice_message.empty()); + } + + const expect<std::string> received = net::zmq::receive(recv_socket.get()); + ASSERT_TRUE(bool(received)); + EXPECT_EQ(message, *received); +} + +TEST(zmq, write_slice_fail) +{ + std::string message; + message.resize(1024); + crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0]))); + + epee::byte_slice slice_message{std::move(message)}; + EXPECT_FALSE(bool(net::zmq::send(std::move(slice_message), nullptr))); + EXPECT_TRUE(slice_message.empty()); +} + TEST(zmq, read_write_multipart) { net::zmq::context context{zmq_init(1)}; |