aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLee Clagett <code@leeclagett.com>2019-11-17 06:06:10 +0000
committerLee Clagett <code@leeclagett.com>2020-04-03 01:56:17 +0000
commitda9915746219482c25b96406d475c6fde9b02a31 (patch)
tree5b1c6be33c9511dc6af48b63d5f87877b9bd06ae
parentMerge pull request #6470 (diff)
downloadmonero-da9915746219482c25b96406d475c6fde9b02a31.tar.xz
Use byte_slice for sending zmq messages - removes data copy within zmq
-rw-r--r--contrib/epee/include/byte_slice.h10
-rw-r--r--contrib/epee/src/byte_slice.cpp19
-rw-r--r--src/net/zmq.cpp18
-rw-r--r--src/net/zmq.h24
-rw-r--r--src/rpc/daemon_handler.cpp14
-rw-r--r--src/rpc/daemon_handler.h3
-rw-r--r--src/rpc/message.cpp14
-rw-r--r--src/rpc/message.h11
-rw-r--r--src/rpc/rpc_handler.h3
-rw-r--r--src/rpc/zmq_server.cpp10
-rw-r--r--tests/unit_tests/net.cpp39
11 files changed, 138 insertions, 27 deletions
diff --git a/contrib/epee/include/byte_slice.h b/contrib/epee/include/byte_slice.h
index 1fbba101e..bd9452b11 100644
--- a/contrib/epee/include/byte_slice.h
+++ b/contrib/epee/include/byte_slice.h
@@ -42,7 +42,12 @@ namespace epee
struct release_byte_slice
{
- void operator()(byte_slice_data*) const noexcept;
+ //! For use with `zmq_message_init_data`, use second arg for buffer pointer.
+ static void call(void*, void* ptr) noexcept;
+ void operator()(byte_slice_data* ptr) const noexcept
+ {
+ call(nullptr, ptr);
+ }
};
/*! Inspired by slices in golang. Storage is thread-safe reference counted,
@@ -140,6 +145,9 @@ namespace epee
\throw std::out_of_range If `size() < end`.
\return Slice starting at `data() + begin` of size `end - begin`. */
byte_slice get_slice(std::size_t begin, std::size_t end) const;
+
+ //! \post `empty()` \return Ownership of ref-counted buffer.
+ std::unique_ptr<byte_slice_data, release_byte_slice> take_buffer() noexcept;
};
} // epee
diff --git a/contrib/epee/src/byte_slice.cpp b/contrib/epee/src/byte_slice.cpp
index 216049e5b..3ad3b3e7e 100644
--- a/contrib/epee/src/byte_slice.cpp
+++ b/contrib/epee/src/byte_slice.cpp
@@ -49,12 +49,16 @@ namespace epee
std::atomic<std::size_t> ref_count;
};
- void release_byte_slice::operator()(byte_slice_data* ptr) const noexcept
+ void release_byte_slice::call(void*, void* ptr) noexcept
{
- if (ptr && --(ptr->ref_count) == 0)
+ if (ptr)
{
- ptr->~byte_slice_data();
- free(ptr);
+ byte_slice_data* self = static_cast<byte_slice_data*>(ptr);
+ if (--(self->ref_count) == 0)
+ {
+ self->~byte_slice_data();
+ free(self);
+ }
}
}
@@ -206,4 +210,11 @@ namespace epee
return {};
return {storage_.get(), {portion_.begin() + begin, end - begin}};
}
+
+ std::unique_ptr<byte_slice_data, release_byte_slice> byte_slice::take_buffer() noexcept
+ {
+ std::unique_ptr<byte_slice_data, release_byte_slice> out{std::move(storage_)};
+ portion_ = nullptr;
+ return out;
+ }
} // epee
diff --git a/src/net/zmq.cpp b/src/net/zmq.cpp
index d02a22983..7ea80b907 100644
--- a/src/net/zmq.cpp
+++ b/src/net/zmq.cpp
@@ -33,6 +33,8 @@
#include <limits>
#include <utility>
+#include "byte_slice.h"
+
namespace net
{
namespace zmq
@@ -183,6 +185,22 @@ namespace zmq
{
return retry_op(zmq_send, socket, payload.data(), payload.size(), flags);
}
+
+ expect<void> send(epee::byte_slice&& payload, void* socket, int flags) noexcept
+ {
+ void* const data = const_cast<std::uint8_t*>(payload.data());
+ const std::size_t size = payload.size();
+ auto buffer = payload.take_buffer(); // clears `payload` from callee
+
+ zmq_msg_t msg{};
+ MONERO_ZMQ_CHECK(zmq_msg_init_data(std::addressof(msg), data, size, epee::release_byte_slice::call, buffer.get()));
+ buffer.release(); // zmq will now decrement byte_slice ref-count
+
+ expect<void> sent = retry_op(zmq_msg_send, std::addressof(msg), socket, flags);
+ if (!sent) // beware if removing `noexcept` from this function - possible leak here
+ zmq_msg_close(std::addressof(msg));
+ return sent;
+ }
} // zmq
} // net
diff --git a/src/net/zmq.h b/src/net/zmq.h
index c6a7fd743..65560b62e 100644
--- a/src/net/zmq.h
+++ b/src/net/zmq.h
@@ -53,6 +53,11 @@
#define MONERO_ZMQ_THROW(msg) \
MONERO_THROW( ::net::zmq::get_error_code(), msg )
+namespace epee
+{
+ class byte_slice;
+}
+
namespace net
{
namespace zmq
@@ -132,5 +137,24 @@ namespace zmq
\param flags See `zmq_send` for possible flags.
\return `success()` if sent, otherwise ZMQ error. */
expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept;
+
+ /*! Sends `payload` on `socket`. Blocks until the entire message is queued
+ for sending, or until `zmq_term` is called on the `zmq_context`
+ associated with `socket`. If the context is terminated,
+ `make_error_code(ETERM)` is returned.
+
+ \note This will automatically retry on `EINTR`, so exiting on
+ interrupts requires context termination.
+ \note If non-blocking behavior is requested on `socket` or by `flags`,
+ then `net::zmq::make_error_code(EAGAIN)` will be returned if this
+ would block.
+
+ \param payload sent as one message on `socket`.
+ \param socket Handle created with `zmq_socket`.
+ \param flags See `zmq_msg_send` for possible flags.
+
+ \post `payload.emtpy()` - ownership is transferred to zmq.
+ \return `success()` if sent, otherwise ZMQ error. */
+ expect<void> send(epee::byte_slice&& payload, void* socket, int flags = 0) noexcept;
} // zmq
} // net
diff --git a/src/rpc/daemon_handler.cpp b/src/rpc/daemon_handler.cpp
index d05854e34..de0510fec 100644
--- a/src/rpc/daemon_handler.cpp
+++ b/src/rpc/daemon_handler.cpp
@@ -33,6 +33,7 @@
#include <stdexcept>
#include <boost/uuid/nil_generator.hpp>
+#include <boost/utility/string_ref.hpp>
// likely included by daemon_handler.h's includes,
// but including here for clarity
#include "cryptonote_core/cryptonote_core.h"
@@ -48,7 +49,7 @@ namespace rpc
{
namespace
{
- using handler_function = std::string(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& msg);
+ using handler_function = epee::byte_slice(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& msg);
struct handler_map
{
const char* method_name;
@@ -66,7 +67,7 @@ namespace rpc
}
template<typename Message>
- std::string handle_message(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& parameters)
+ epee::byte_slice handle_message(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& parameters)
{
typename Message::Request request{};
request.fromJson(parameters);
@@ -903,7 +904,7 @@ namespace rpc
return true;
}
- std::string DaemonHandler::handle(const std::string& request)
+ epee::byte_slice DaemonHandler::handle(const std::string& request)
{
MDEBUG("Handling RPC request: " << request);
@@ -916,8 +917,11 @@ namespace rpc
if (matched_handler == std::end(handlers) || matched_handler->method_name != request_type)
return BAD_REQUEST(request_type, req_full.getID());
- std::string response = matched_handler->call(*this, req_full.getID(), req_full.getMessage());
- MDEBUG("Returning RPC response: " << response);
+ epee::byte_slice response = matched_handler->call(*this, req_full.getID(), req_full.getMessage());
+
+ const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
+ MDEBUG("Returning RPC response: " << response_view);
+
return response;
}
catch (const std::exception& e)
diff --git a/src/rpc/daemon_handler.h b/src/rpc/daemon_handler.h
index 61eac17f0..b797b1155 100644
--- a/src/rpc/daemon_handler.h
+++ b/src/rpc/daemon_handler.h
@@ -28,6 +28,7 @@
#pragma once
+#include "byte_slice.h"
#include "daemon_messages.h"
#include "daemon_rpc_version.h"
#include "rpc_handler.h"
@@ -132,7 +133,7 @@ class DaemonHandler : public RpcHandler
void handle(const GetOutputDistribution::Request& req, GetOutputDistribution::Response& res);
- std::string handle(const std::string& request);
+ epee::byte_slice handle(const std::string& request) override final;
private:
diff --git a/src/rpc/message.cpp b/src/rpc/message.cpp
index 5b6a1c05b..0d8983cb1 100644
--- a/src/rpc/message.cpp
+++ b/src/rpc/message.cpp
@@ -149,7 +149,7 @@ cryptonote::rpc::error FullMessage::getError()
return err;
}
-std::string FullMessage::getRequest(const std::string& request, const Message& message, const unsigned id)
+epee::byte_slice FullMessage::getRequest(const std::string& request, const Message& message, const unsigned id)
{
rapidjson::StringBuffer buffer;
{
@@ -172,11 +172,11 @@ std::string FullMessage::getRequest(const std::string& request, const Message& m
if (!dest.IsComplete())
throw std::logic_error{"Invalid JSON tree generated"};
}
- return std::string{buffer.GetString(), buffer.GetSize()};
+ return epee::byte_slice{{buffer.GetString(), buffer.GetSize()}};
}
-std::string FullMessage::getResponse(const Message& message, const rapidjson::Value& id)
+epee::byte_slice FullMessage::getResponse(const Message& message, const rapidjson::Value& id)
{
rapidjson::StringBuffer buffer;
{
@@ -207,17 +207,17 @@ std::string FullMessage::getResponse(const Message& message, const rapidjson::Va
if (!dest.IsComplete())
throw std::logic_error{"Invalid JSON tree generated"};
}
- return std::string{buffer.GetString(), buffer.GetSize()};
+ return epee::byte_slice{{buffer.GetString(), buffer.GetSize()}};
}
// convenience functions for bad input
-std::string BAD_REQUEST(const std::string& request)
+epee::byte_slice BAD_REQUEST(const std::string& request)
{
rapidjson::Value invalid;
return BAD_REQUEST(request, invalid);
}
-std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id)
+epee::byte_slice BAD_REQUEST(const std::string& request, const rapidjson::Value& id)
{
Message fail;
fail.status = Message::STATUS_BAD_REQUEST;
@@ -225,7 +225,7 @@ std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id)
return FullMessage::getResponse(fail, id);
}
-std::string BAD_JSON(const std::string& error_details)
+epee::byte_slice BAD_JSON(const std::string& error_details)
{
rapidjson::Value invalid;
Message fail;
diff --git a/src/rpc/message.h b/src/rpc/message.h
index 4cbc84fe4..8156e232c 100644
--- a/src/rpc/message.h
+++ b/src/rpc/message.h
@@ -33,6 +33,7 @@
#include <rapidjson/writer.h>
#include <string>
+#include "byte_slice.h"
#include "rpc/message_data_structs.h"
namespace cryptonote
@@ -85,8 +86,8 @@ namespace rpc
cryptonote::rpc::error getError();
- static std::string getRequest(const std::string& request, const Message& message, unsigned id);
- static std::string getResponse(const Message& message, const rapidjson::Value& id);
+ static epee::byte_slice getRequest(const std::string& request, const Message& message, unsigned id);
+ static epee::byte_slice getResponse(const Message& message, const rapidjson::Value& id);
private:
FullMessage() = default;
@@ -99,10 +100,10 @@ namespace rpc
// convenience functions for bad input
- std::string BAD_REQUEST(const std::string& request);
- std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id);
+ epee::byte_slice BAD_REQUEST(const std::string& request);
+ epee::byte_slice BAD_REQUEST(const std::string& request, const rapidjson::Value& id);
- std::string BAD_JSON(const std::string& error_details);
+ epee::byte_slice BAD_JSON(const std::string& error_details);
} // namespace rpc
diff --git a/src/rpc/rpc_handler.h b/src/rpc/rpc_handler.h
index b81983d28..9a1c3fc12 100644
--- a/src/rpc/rpc_handler.h
+++ b/src/rpc/rpc_handler.h
@@ -32,6 +32,7 @@
#include <cstdint>
#include <string>
#include <vector>
+#include "byte_slice.h"
#include "crypto/hash.h"
namespace cryptonote
@@ -54,7 +55,7 @@ class RpcHandler
RpcHandler() { }
virtual ~RpcHandler() { }
- virtual std::string handle(const std::string& request) = 0;
+ virtual epee::byte_slice handle(const std::string& request) = 0;
static boost::optional<output_distribution_data>
get_output_distribution(const std::function<bool(uint64_t, uint64_t, uint64_t, uint64_t&, std::vector<uint64_t>&, uint64_t&)> &f, uint64_t amount, uint64_t from_height, uint64_t to_height, const std::function<crypto::hash(uint64_t)> &get_hash, bool cumulative, uint64_t blockchain_height);
diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp
index 1ee55673e..91e4751d1 100644
--- a/src/rpc/zmq_server.cpp
+++ b/src/rpc/zmq_server.cpp
@@ -28,10 +28,13 @@
#include "zmq_server.h"
+#include <boost/utility/string_ref.hpp>
#include <chrono>
#include <cstdint>
#include <system_error>
+#include "byte_slice.h"
+
namespace cryptonote
{
@@ -73,10 +76,11 @@ void ZmqServer::serve()
{
const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
MDEBUG("Received RPC request: \"" << message << "\"");
- const std::string& response = handler.handle(message);
+ epee::byte_slice response = handler.handle(message);
- MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
- MDEBUG("Sent RPC reply: \"" << response << "\"");
+ const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
+ MDEBUG("Sending RPC reply: \"" << response_view << "\"");
+ MONERO_UNWRAP(net::zmq::send(std::move(response), socket.get()));
}
}
catch (const std::system_error& e)
diff --git a/tests/unit_tests/net.cpp b/tests/unit_tests/net.cpp
index 36cb28ae0..f5aef4796 100644
--- a/tests/unit_tests/net.cpp
+++ b/tests/unit_tests/net.cpp
@@ -1702,6 +1702,45 @@ TEST(zmq, read_write)
EXPECT_EQ(message, *received);
}
+TEST(zmq, read_write_slice)
+{
+ net::zmq::context context{zmq_init(1)};
+ ASSERT_NE(nullptr, context);
+
+ net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
+ net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
+ ASSERT_NE(nullptr, send_socket);
+ ASSERT_NE(nullptr, recv_socket);
+
+ ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
+ ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
+
+ std::string message;
+ message.resize(1024);
+ crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
+
+ {
+ epee::byte_slice slice_message{{epee::strspan<std::uint8_t>(message)}};
+ ASSERT_TRUE(bool(net::zmq::send(std::move(slice_message), send_socket.get())));
+ EXPECT_TRUE(slice_message.empty());
+ }
+
+ const expect<std::string> received = net::zmq::receive(recv_socket.get());
+ ASSERT_TRUE(bool(received));
+ EXPECT_EQ(message, *received);
+}
+
+TEST(zmq, write_slice_fail)
+{
+ std::string message;
+ message.resize(1024);
+ crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
+
+ epee::byte_slice slice_message{std::move(message)};
+ EXPECT_FALSE(bool(net::zmq::send(std::move(slice_message), nullptr)));
+ EXPECT_TRUE(slice_message.empty());
+}
+
TEST(zmq, read_write_multipart)
{
net::zmq::context context{zmq_init(1)};