aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net/levin_protocol_handler_async.h
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/epee/include/net/levin_protocol_handler_async.h')
-rw-r--r--contrib/epee/include/net/levin_protocol_handler_async.h132
1 files changed, 98 insertions, 34 deletions
diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h
index 3f734a72b..208911e1a 100644
--- a/contrib/epee/include/net/levin_protocol_handler_async.h
+++ b/contrib/epee/include/net/levin_protocol_handler_async.h
@@ -32,6 +32,7 @@
#include <boost/smart_ptr/make_shared.hpp>
#include <atomic>
+#include <deque>
#include "levin_base.h"
#include "buffer.h"
@@ -91,6 +92,7 @@ public:
int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
+ int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
bool close(boost::uuids::uuid connection_id);
bool update_connection_context(const t_connection_context& contxt);
bool request_callback(boost::uuids::uuid connection_id);
@@ -117,6 +119,22 @@ public:
template<class t_connection_context = net_utils::connection_context_base>
class async_protocol_handler
{
+ std::string m_fragment_buffer;
+
+ bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response)
+ {
+ const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response);
+ if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff}))
+ return false;
+
+ MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
+ << ", flags" << head.m_flags
+ << ", r?=" << head.m_have_to_return_data
+ <<", cmd = " << head.m_command
+ << ", ver=" << head.m_protocol_version);
+ return true;
+ }
+
public:
typedef t_connection_context connection_context;
typedef async_protocol_handler_config<t_connection_context> config_type;
@@ -259,34 +277,6 @@ public:
return handler->is_timer_started();
}
template<class callback_t> friend struct anvoke_handler;
-
- static bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept
- {
- bucket_head2 head = {0};
- head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
- head.m_have_to_return_data = expect_response;
- head.m_cb = SWAP64LE(msg_size);
-
- head.m_command = SWAP32LE(command);
- head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
- head.m_flags = SWAP32LE(flags);
- return head;
- }
-
- bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response)
- {
- const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response);
- if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff}))
- return false;
-
- MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
- << ", flags" << head.m_flags
- << ", r?=" << head.m_have_to_return_data
- <<", cmd = " << head.m_command
- << ", ver=" << head.m_protocol_version);
- return true;
- }
-
public:
async_protocol_handler(net_utils::i_service_endpoint* psnd_hndlr,
config_type& config,
@@ -403,7 +393,12 @@ public:
return false;
}
- if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size)
+ // these should never fail, but do runtime check for safety
+ CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()");
+ CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()");
+
+ // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public
+ if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size())
{
MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size
<< ", packet received " << m_cache_in_buffer.size() + cb
@@ -435,8 +430,38 @@ public:
}
break;
}
+
{
+ std::string temp{};
epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb);
+ m_state = stream_state_head;
+
+ // abstract_tcp_server2.h manages max bandwidth for a p2p link
+ if (!(m_current_head.m_flags & (LEVIN_PACKET_REQUEST | LEVIN_PACKET_RESPONSE)))
+ {
+ // special noise/fragment command
+ static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END);
+ if ((m_current_head.m_flags & both_flags) == both_flags)
+ break; // noise message, skip to next message
+
+ if (m_current_head.m_flags & LEVIN_PACKET_BEGIN)
+ m_fragment_buffer.clear();
+
+ m_fragment_buffer.append(reinterpret_cast<const char*>(buff_to_invoke.data()), buff_to_invoke.size());
+ if (!(m_current_head.m_flags & LEVIN_PACKET_END))
+ break; // skip to next message
+
+ if (m_fragment_buffer.size() < sizeof(bucket_head2))
+ {
+ MERROR(m_connection_context << "Fragmented data too small for levin header");
+ return false;
+ }
+
+ temp = std::move(m_fragment_buffer);
+ m_fragment_buffer.clear();
+ std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2));
+ buff_to_invoke = {reinterpret_cast<const uint8_t*>(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)};
+ }
bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE);
@@ -489,9 +514,9 @@ public:
m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context
);
- bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false);
- head.m_return_code = SWAP32LE(return_code);
- return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head));
+ bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false);
+ head.m_return_code = SWAP32LE(return_code);
+ return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head));
if(!m_pservice_endpoint->do_send(byte_slice{std::move(return_buff)}))
return false;
@@ -505,8 +530,13 @@ public:
else
m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
}
+ // reuse small buffer
+ if (!temp.empty() && temp.capacity() <= 64 * 1024)
+ {
+ temp.clear();
+ m_fragment_buffer = std::move(temp);
+ }
}
- m_state = stream_state_head;
break;
case stream_state_head:
{
@@ -616,7 +646,7 @@ public:
if (LEVIN_OK != err_code)
{
- epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0};
+ epee::span<const uint8_t> stub_buff = nullptr;
// Never call callback inside critical section, that can cause deadlock
cb(err_code, stub_buff, m_connection_context);
return false;
@@ -698,6 +728,32 @@ public:
return 1;
}
+
+ /*! Sends `message` without adding a levin header. The message must have
+ been created with `make_notify`, `make_noise_notify` or
+ `make_fragmented_notify`. See additional instructions for
+ `make_fragmented_notify`.
+
+ \return 1 on success */
+ int send(byte_slice message)
+ {
+ const misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
+ boost::bind(&async_protocol_handler::finish_outer_call, this)
+ );
+
+ if(m_deletion_initiated)
+ return LEVIN_ERROR_CONNECTION_DESTROYED;
+
+ const std::size_t length = message.size();
+ if (!m_pservice_endpoint->do_send(std::move(message)))
+ {
+ LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
+ return -1;
+ }
+
+ MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]");
+ return 1;
+ }
//------------------------------------------------------------------------------------------
boost::uuids::uuid get_connection_id() {return m_connection_context.m_connection_id;}
//------------------------------------------------------------------------------------------
@@ -876,6 +932,14 @@ int async_protocol_handler_config<t_connection_context>::notify(int command, con
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
+int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id)
+{
+ async_protocol_handler<t_connection_context>* aph;
+ int r = find_and_lock_connection(connection_id, aph);
+ return LEVIN_OK == r ? aph->send(std::move(message)) : 0;
+}
+//------------------------------------------------------------------------------------------
+template<class t_connection_context>
bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id)
{
CRITICAL_REGION_LOCAL(m_connects_lock);