aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/epee/include/net')
-rw-r--r--contrib/epee/include/net/levin_base.h44
-rw-r--r--contrib/epee/include/net/levin_protocol_handler_async.h97
2 files changed, 64 insertions, 77 deletions
diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h
index df59a6c44..fce6d4b7e 100644
--- a/contrib/epee/include/net/levin_base.h
+++ b/contrib/epee/include/net/levin_base.h
@@ -31,7 +31,6 @@
#include <cstdint>
-#include "byte_stream.h"
#include "net_utils_base.h"
#include "span.h"
@@ -84,12 +83,11 @@ namespace levin
#define LEVIN_PROTOCOL_VER_0 0
#define LEVIN_PROTOCOL_VER_1 1
-
template<class t_connection_context = net_utils::connection_context_base>
struct levin_commands_handler
{
- virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, t_connection_context& context)=0;
+ virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, t_connection_context& context)=0;
virtual int notify(int command, const epee::span<const uint8_t> in_buff, t_connection_context& context)=0;
virtual void callback(t_connection_context& context){};
@@ -127,41 +125,12 @@ namespace levin
}
}
- //! Provides space for levin (p2p) header, so that payload can be sent without copy
- class message_writer
- {
- byte_slice finalize(uint32_t command, uint32_t flags, uint32_t return_code, bool expect_response);
- public:
- using header = bucket_head2;
-
- explicit message_writer(std::size_t reserve = 8192);
-
- message_writer(const message_writer&) = delete;
- message_writer(message_writer&&) = default;
- ~message_writer() = default;
- message_writer& operator=(const message_writer&) = delete;
- message_writer& operator=(message_writer&&) = default;
-
- //! \return Size of payload (excludes header size).
- std::size_t payload_size() const noexcept
- {
- return buffer.size() < sizeof(header) ? 0 : buffer.size() - sizeof(header);
- }
-
- byte_slice finalize_invoke(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, true); }
- byte_slice finalize_notify(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, false); }
- byte_slice finalize_response(uint32_t command, uint32_t return_code)
- {
- return finalize(command, LEVIN_PACKET_RESPONSE, return_code, false);
- }
-
- //! Has space for levin header until a finalize method is used
- byte_stream buffer;
- };
-
//! \return Intialized levin header.
bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept;
+ //! \return A levin notification message.
+ byte_slice make_notify(int command, epee::span<const std::uint8_t> payload);
+
/*! Generate a dummy levin message.
\param noise_bytes Total size of the returned `byte_slice`.
@@ -171,11 +140,12 @@ namespace levin
/*! Generate 1+ levin messages that are identical to the noise message size.
- \param noise_size Each levin message will be identical to this value.
+ \param noise Each levin message will be identical to the size of this
+ message. The bytes from this message will be used for padding.
\return `nullptr` if `noise.size()` is less than the levin header size.
Otherwise, a levin notification message OR 2+ levin fragment messages.
Each message is `noise.size()` in length. */
- byte_slice make_fragmented_notify(const std::size_t noise_size, int command, message_writer message);
+ byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload);
}
}
diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h
index a6816cafc..d062fa877 100644
--- a/contrib/epee/include/net/levin_protocol_handler_async.h
+++ b/contrib/epee/include/net/levin_protocol_handler_async.h
@@ -51,21 +51,6 @@
#define MIN_BYTES_WANTED 512
#endif
-template<typename context_t>
-void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category)
-{
- MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
- << " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
-}
-
-template<typename context_t>
-void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
-{
- char buf[32];
- snprintf(buf, sizeof(buf), "command-%u", command);
- on_levin_traffic(context, initiator, sent, error, bytes, buf);
-}
-
namespace epee
{
namespace levin
@@ -103,10 +88,11 @@ public:
uint64_t m_max_packet_size;
uint64_t m_invoke_timeout;
- int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id);
+ int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id);
template<class callback_t>
- int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
+ int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
+ int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
bool close(boost::uuids::uuid connection_id);
bool update_connection_context(const t_connection_context& contxt);
@@ -136,17 +122,12 @@ class async_protocol_handler
{
std::string m_fragment_buffer;
- bool send_message(byte_slice message)
+ bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response)
{
- if (message.size() < sizeof(message_writer::header))
- return false;
-
- message_writer::header head;
- std::memcpy(std::addressof(head), message.data(), sizeof(head));
- if(!m_pservice_endpoint->do_send(std::move(message)))
+ const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response);
+ if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff}))
return false;
- on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command);
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data
@@ -542,17 +523,26 @@ public:
{
if(m_current_head.m_have_to_return_data)
{
- levin::message_writer return_message{32 * 1024};
+ byte_slice return_buff;
const uint32_t return_code = m_config.m_pcommands_handler->invoke(
- m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context
+ m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context
);
// peer_id remains unset if dropped
if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete())
m_max_packet_size = m_config.m_max_packet_size;
- if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code)))
+ bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false);
+ head.m_return_code = SWAP32LE(return_code);
+
+ if(!m_pservice_endpoint->do_send(byte_slice{{epee::as_byte_span(head), epee::to_span(return_buff)}}))
return false;
+
+ MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
+ << ", flags" << head.m_flags
+ << ", r?=" << head.m_have_to_return_data
+ <<", cmd = " << head.m_command
+ << ", ver=" << head.m_protocol_version);
}
else
m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
@@ -629,7 +619,7 @@ public:
}
template<class callback_t>
- bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
+ bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@@ -648,7 +638,7 @@ public:
if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size;
- if(!send_message(in_msg.finalize_invoke(command)))
+ if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true))
{
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
err_code = LEVIN_ERROR_CONNECTION;
@@ -674,7 +664,7 @@ public:
return true;
}
- int invoke(int command, message_writer in_msg, std::string& buff_out)
+ int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@@ -686,7 +676,7 @@ public:
if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size;
- if (!send_message(in_msg.finalize_invoke(command)))
+ if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true))
{
LOG_ERROR_CC(m_connection_context, "Failed to send request");
return LEVIN_ERROR_CONNECTION;
@@ -723,9 +713,25 @@ public:
return m_invoke_result_code;
}
- /*! Sends `message` without adding a levin header. The message must have been
- created with `make_noise_notify`, `make_fragmented_notify`, or
- `message_writer::finalize_notify`. See additional instructions for
+ int notify(int command, const epee::span<const uint8_t> in_buff)
+ {
+ misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
+ boost::bind(&async_protocol_handler::finish_outer_call, this));
+
+ CRITICAL_REGION_LOCAL(m_call_lock);
+
+ if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false))
+ {
+ LOG_ERROR_CC(m_connection_context, "Failed to send notify message");
+ return -1;
+ }
+
+ return 1;
+ }
+
+ /*! Sends `message` without adding a levin header. The message must have
+ been created with `make_notify`, `make_noise_notify` or
+ `make_fragmented_notify`. See additional instructions for
`make_fragmented_notify`.
\return 1 on success */
@@ -735,11 +741,14 @@ public:
boost::bind(&async_protocol_handler::finish_outer_call, this)
);
- if (!send_message(std::move(message)))
+ const std::size_t length = message.size();
+ if (!m_pservice_endpoint->do_send(std::move(message)))
{
LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
return -1;
}
+
+ MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]");
return 1;
}
//------------------------------------------------------------------------------------------
@@ -829,19 +838,19 @@ int async_protocol_handler_config<t_connection_context>::find_and_lock_connectio
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
-int async_protocol_handler_config<t_connection_context>::invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id)
+int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
- return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r;
+ return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
-int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
+int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
- return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r;
+ return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
@@ -920,6 +929,14 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
+int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id)
+{
+ async_protocol_handler<t_connection_context>* aph;
+ int r = find_and_lock_connection(connection_id, aph);
+ return LEVIN_OK == r ? aph->notify(command, in_buff) : r;
+}
+//------------------------------------------------------------------------------------------
+template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id)
{
async_protocol_handler<t_connection_context>* aph;