diff options
Diffstat (limited to '')
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.h | 28 | ||||
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.inl | 78 | ||||
-rw-r--r-- | contrib/epee/include/net/connection_basic.hpp | 9 | ||||
-rw-r--r-- | contrib/epee/include/net/enums.h | 2 | ||||
-rw-r--r-- | contrib/epee/include/net/http_protocol_handler.inl | 9 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_base.h | 27 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler.h | 5 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler_async.h | 207 | ||||
-rw-r--r-- | contrib/epee/include/net/net_helper.h | 13 | ||||
-rw-r--r-- | contrib/epee/include/net/net_utils_base.h | 5 |
10 files changed, 211 insertions, 172 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index b38ab5399..3a2c5341d 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -49,10 +49,12 @@ #include <boost/asio/ssl.hpp> #include <boost/array.hpp> #include <boost/noncopyable.hpp> -#include <boost/shared_ptr.hpp> +#include <boost/shared_ptr.hpp> //! \TODO Convert to std::shared_ptr #include <boost/enable_shared_from_this.hpp> #include <boost/interprocess/detail/atomic.hpp> #include <boost/thread/thread.hpp> +#include <memory> +#include "byte_slice.h" #include "net_utils_base.h" #include "syncobj.h" #include "connection_basic.hpp" @@ -90,25 +92,24 @@ namespace net_utils public: typedef typename t_protocol_handler::connection_context t_connection_context; - struct shared_state : connection_basic_shared_state + struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type { shared_state() - : connection_basic_shared_state(), pfilter(nullptr), config(), stop_signal_sent(false) + : connection_basic_shared_state(), t_protocol_handler::config_type(), pfilter(nullptr), stop_signal_sent(false) {} i_connection_filter* pfilter; - typename t_protocol_handler::config_type config; bool stop_signal_sent; }; /// Construct a connection with the given io_service. explicit connection( boost::asio::io_service& io_service, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); explicit connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); @@ -135,8 +136,7 @@ namespace net_utils private: //----------------- i_service_endpoint --------------------- - virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) - virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data + virtual bool do_send(byte_slice message); ///< (see do_send from i_service_endpoint) virtual bool send_done(); virtual bool close(); virtual bool call_run_once_service_io(); @@ -145,6 +145,8 @@ namespace net_utils virtual bool add_ref(); virtual bool release(); //------------------------------------------------------ + bool do_send_chunk(byte_slice chunk); ///< will send (or queue) a part of data. internal use only + boost::shared_ptr<connection<t_protocol_handler> > safe_shared_from_this(); bool shutdown(); /// Handle completion of a receive operation. @@ -269,7 +271,13 @@ namespace net_utils typename t_protocol_handler::config_type& get_config_object() { assert(m_state != nullptr); // always set in constructor - return m_state->config; + return *m_state; + } + + std::shared_ptr<typename t_protocol_handler::config_type> get_config_shared() + { + assert(m_state != nullptr); // always set in constructor + return {m_state}; } int get_binded_port(){return m_port;} @@ -350,7 +358,7 @@ namespace net_utils bool is_thread_worker(); - const boost::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state; + const std::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state; /// The io_service used to perform asynchronous operations. struct worker diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index 19e9c9af9..12a87071a 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -68,7 +68,7 @@ namespace epee namespace net_utils { template<typename T> - T& check_and_get(boost::shared_ptr<T>& ptr) + T& check_and_get(std::shared_ptr<T>& ptr) { CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null"); return *ptr; @@ -81,7 +81,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) @@ -91,13 +91,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) : connection_basic(std::move(sock), state, ssl_support), - m_protocol_handler(this, check_and_get(state).config, context), + m_protocol_handler(this, check_and_get(state), context), buffer_ssl_init_fill(0), m_connection_type( connection_type ), m_throttle_speed_in("speed_in", "throttle_speed_in"), @@ -378,7 +378,6 @@ PRAGMA_WARNING_DISABLE_VS(4355) if(!recv_res) { //_info("[sock " << socket().native_handle() << "] protocol_want_close"); - //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); bool do_shutdown = false; @@ -520,7 +519,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) } //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) { + bool connection<t_protocol_handler>::do_send(byte_slice message) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -529,6 +528,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) if (m_was_shutdown) return false; // TODO avoid copy + std::uint8_t const* const message_data = message.data(); + const std::size_t message_size = message.size(); + const double factor = 32; // TODO config typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); @@ -538,13 +540,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) CHECK_AND_ASSERT_MES(! (chunksize_max<0), false, "Negative chunksize_max" ); // make sure it is unsigned before removin sign with cast: long long unsigned int chunksize_max_unsigned = static_cast<long long unsigned int>( chunksize_max ) ; - if (allow_split && (cb > chunksize_max_unsigned)) { + if (allow_split && (message_size > chunksize_max_unsigned)) { { // LOCK: chunking epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical *** - MDEBUG("do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr); - t_safe all = cb; // all bytes to send - t_safe pos = 0; // current sending position + MDEBUG("do_send() will SPLIT into small chunks, from packet="<<message_size<<" B for ptr="<<message_data); // 01234567890 // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4 // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8 @@ -554,40 +554,25 @@ PRAGMA_WARNING_DISABLE_VS(4355) // char* buf = new char[ bufsize ]; bool all_ok = true; - while (pos < all) { - t_safe lenall = all-pos; // length from here to end - t_safe len = std::min( chunksize_good , lenall); // take a smaller part - CHECK_AND_ASSERT_MES(len<=chunksize_good, false, "len too large"); - // pos=8; len=4; all=10; len=3; - - CHECK_AND_ASSERT_MES(! (len<0), false, "negative len"); // check before we cast away sign: - unsigned long long int len_unsigned = static_cast<long long int>( len ); - CHECK_AND_ASSERT_MES(len>0, false, "len not strictly positive"); // (redundant) - CHECK_AND_ASSERT_MES(len_unsigned < std::numeric_limits<size_t>::max(), false, "Invalid len_unsigned"); // yeap we want strong < then max size, to be sure - - void *chunk_start = ((char*)ptr) + pos; - MDEBUG("chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos); - CHECK_AND_ASSERT_MES(chunk_start >= ptr, false, "Pointer wraparound"); // not wrapped around address? - //std::memcpy( (void*)buf, chunk_start, len); - - MDEBUG("part of " << lenall << ": pos="<<pos << " len="<<len); - - bool ok = do_send_chunk(chunk_start, len); // <====== *** + while (!message.empty()) { + byte_slice chunk = message.take_slice(chunksize_good); + + MDEBUG("chunk_start="<<chunk.data()<<" ptr="<<message_data<<" pos="<<(chunk.data() - message_data)); + MDEBUG("part of " << message.size() << ": pos="<<(chunk.data() - message_data) << " len="<<chunk.size()); + + bool ok = do_send_chunk(std::move(chunk)); // <====== *** all_ok = all_ok && ok; if (!all_ok) { - MDEBUG("do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr); + MDEBUG("do_send() DONE ***FAILED*** from packet="<<message_size<<" B for ptr="<<message_data); MDEBUG("do_send() SEND was aborted in middle of big package - this is mostly harmless " - << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb); + << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << message_size); return false; // partial failure in sending } - pos = pos+len; - CHECK_AND_ASSERT_MES(pos >0, false, "pos <= 0"); - // (in catch block, or uniq pointer) delete buf; } // each chunk - MDEBUG("do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + MDEBUG("do_send() DONE SPLIT from packet="<<message_size<<" B for ptr="<<message_data); MDEBUG("do_send() m_connection_type = " << m_connection_type); @@ -595,7 +580,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) } // LOCK: chunking } // a big block (to be chunked) - all chunks else { // small block - return do_send_chunk(ptr,cb); // just send as 1 big chunk + return do_send_chunk(std::move(message)); // just send as 1 big chunk } CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); @@ -603,7 +588,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send_chunk(const void* ptr, size_t cb) + bool connection<t_protocol_handler>::do_send_chunk(byte_slice chunk) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -615,7 +600,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) double current_speed_up; { CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); - m_throttle_speed_out.handle_trafic_exact(cb); + m_throttle_speed_out.handle_trafic_exact(chunk.size()); current_speed_up = m_throttle_speed_out.get_current_speed(); } context.m_current_speed_up = current_speed_up; @@ -623,7 +608,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //_info("[sock " << socket().native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); - context.m_send_cnt += cb; + context.m_send_cnt += chunk.size(); //some data should be wrote to stream //request complete @@ -644,7 +629,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) }*/ long int ms = 250 + (rand()%50); - MDEBUG("Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<cb); // XXX debug sleep + MDEBUG("Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<chunk.size()); // XXX debug sleep m_send_que_lock.unlock(); boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); m_send_que_lock.lock(); @@ -657,12 +642,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) } } - m_send_que.resize(m_send_que.size()+1); - m_send_que.back().assign((const char*)ptr, cb); - + m_send_que.push_back(std::move(chunk)); + if(m_send_que.size() > 1) { // active operation should be in progress, nothing to do, just wait last operation callback - auto size_now = cb; + auto size_now = m_send_que.back().size(); MDEBUG("do_send_chunk() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function @@ -680,7 +664,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) auto size_now = m_send_que.front().size(); MDEBUG("do_send_chunk() NOW SENSD: packet="<<size_now<<" B"); if (speed_limit_is_enabled()) - do_send_handler_write( ptr , size_now ); // (((H))) + do_send_handler_write( m_send_que.back().data(), m_send_que.back().size() ); // (((H))) CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size"); reset_timer(get_default_timeout(), false); @@ -908,7 +892,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) : - m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), + m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), m_io_service_local_instance(new worker()), io_service_(m_io_service_local_instance->io_service), acceptor_(io_service_), @@ -927,7 +911,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : - m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), + m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), io_service_(extarnal_io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), diff --git a/contrib/epee/include/net/connection_basic.hpp b/contrib/epee/include/net/connection_basic.hpp index 2acc6cdda..2f60f7604 100644 --- a/contrib/epee/include/net/connection_basic.hpp +++ b/contrib/epee/include/net/connection_basic.hpp @@ -49,6 +49,7 @@ #include <boost/asio.hpp> #include <boost/asio/ssl.hpp> +#include "byte_slice.h" #include "net/net_utils_base.h" #include "net/net_ssl.h" #include "syncobj.h" @@ -99,7 +100,7 @@ class connection_basic_pimpl; // PIMPL for this class class connection_basic { // not-templated base class for rapid developmet of some code parts // beware of removing const, net_utils::connection is sketchily doing a cast to prevent storing ptr twice - const boost::shared_ptr<connection_basic_shared_state> m_state; + const std::shared_ptr<connection_basic_shared_state> m_state; public: std::unique_ptr< connection_basic_pimpl > mI; // my Implementation @@ -108,7 +109,7 @@ class connection_basic { // not-templated base class for rapid developmet of som volatile uint32_t m_want_close_connection; std::atomic<bool> m_was_shutdown; critical_section m_send_que_lock; - std::list<std::string> m_send_que; + std::deque<byte_slice> m_send_que; volatile bool m_is_multithreaded; /// Strand to ensure the connection's handlers are not called concurrently. boost::asio::io_service::strand strand_; @@ -118,8 +119,8 @@ class connection_basic { // not-templated base class for rapid developmet of som public: // first counter is the ++/-- count of current sockets, the other socket_number is only-increasing ++ number generator - connection_basic(boost::asio::ip::tcp::socket&& socket, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); - connection_basic(boost::asio::io_service &io_service, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); + connection_basic(boost::asio::ip::tcp::socket&& socket, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); + connection_basic(boost::asio::io_service &io_service, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); virtual ~connection_basic() noexcept(false); diff --git a/contrib/epee/include/net/enums.h b/contrib/epee/include/net/enums.h index 078a4b274..2f27d07f9 100644 --- a/contrib/epee/include/net/enums.h +++ b/contrib/epee/include/net/enums.h @@ -49,7 +49,7 @@ namespace net_utils { invalid = 0, public_ = 1, // public is keyword - i2p = 2, + i2p = 2, // order from here changes priority of selection for origin TXes tor = 3 }; diff --git a/contrib/epee/include/net/http_protocol_handler.inl b/contrib/epee/include/net/http_protocol_handler.inl index 790d0f3b1..19bdf4ff0 100644 --- a/contrib/epee/include/net/http_protocol_handler.inl +++ b/contrib/epee/include/net/http_protocol_handler.inl @@ -591,11 +591,12 @@ namespace net_utils std::string response_data = get_response_header(response); //LOG_PRINT_L0("HTTP_SEND: << \r\n" << response_data + response.m_body); - LOG_PRINT_L3("HTTP_RESPONSE_HEAD: << \r\n" << response_data); - - m_psnd_hndlr->do_send((void*)response_data.data(), response_data.size()); + LOG_PRINT_L3("HTTP_RESPONSE_HEAD: << \r\n" << response_data); + if ((response.m_body.size() && (query_info.m_http_method != http::http_method_head)) || (query_info.m_http_method == http::http_method_options)) - m_psnd_hndlr->do_send((void*)response.m_body.data(), response.m_body.size()); + response_data += response.m_body; + + m_psnd_hndlr->do_send(byte_slice{std::move(response_data)}); m_psnd_hndlr->send_done(); return res; } diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index a88a1eb49..f9b6f9a81 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -29,7 +29,11 @@ #ifndef _LEVIN_BASE_H_ #define _LEVIN_BASE_H_ +#include <cstdint> + +#include "byte_slice.h" #include "net_utils_base.h" +#include "span.h" #define LEVIN_SIGNATURE 0x0101010101012101LL //Bender's nightmare @@ -72,6 +76,8 @@ namespace levin #define LEVIN_PACKET_REQUEST 0x00000001 #define LEVIN_PACKET_RESPONSE 0x00000002 +#define LEVIN_PACKET_BEGIN 0x00000004 +#define LEVIN_PACKET_END 0x00000008 #define LEVIN_PROTOCOL_VER_0 0 @@ -118,9 +124,30 @@ namespace levin } } + //! \return Intialized levin header. + bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; + + //! \return A levin notification message. + byte_slice make_notify(int command, epee::span<const std::uint8_t> payload); + + /*! Generate a dummy levin message. + \param noise_bytes Total size of the returned `byte_slice`. + \return `nullptr` if `noise_size` is smaller than the levin header. + Otherwise, a dummy levin message. */ + byte_slice make_noise_notify(std::size_t noise_bytes); + + /*! Generate 1+ levin messages that are identical to the noise message size. + + \param noise Each levin message will be identical to the size of this + message. The bytes from this message will be used for padding. + \return `nullptr` if `noise.size()` is less than the levin header size. + Otherwise, a levin notification message OR 2+ levin fragment messages. + Each message is `noise.size()` in length. */ + byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload); } } #endif //_LEVIN_BASE_H_ + diff --git a/contrib/epee/include/net/levin_protocol_handler.h b/contrib/epee/include/net/levin_protocol_handler.h index 791766762..c510cfd79 100644 --- a/contrib/epee/include/net/levin_protocol_handler.h +++ b/contrib/epee/include/net/levin_protocol_handler.h @@ -157,10 +157,9 @@ namespace levin m_current_head.m_return_code = m_config.m_pcommands_handler->invoke(m_current_head.m_command, buff_to_invoke, return_buff, m_conn_context); m_current_head.m_cb = return_buff.size(); m_current_head.m_have_to_return_data = false; - std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); - send_buff += return_buff; - if(!m_psnd_hndlr->do_send(send_buff.data(), send_buff.size())) + return_buff.insert(0, (const char*)&m_current_head, sizeof(m_current_head)); + if(!m_psnd_hndlr->do_send(byte_slice{std::move(return_buff)})) return false; } diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 8d7ffb2c2..208911e1a 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -32,6 +32,7 @@ #include <boost/smart_ptr/make_shared.hpp> #include <atomic> +#include <deque> #include "levin_base.h" #include "buffer.h" @@ -91,6 +92,7 @@ public: int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id); + int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); bool update_connection_context(const t_connection_context& contxt); bool request_callback(boost::uuids::uuid connection_id); @@ -117,6 +119,22 @@ public: template<class t_connection_context = net_utils::connection_context_base> class async_protocol_handler { + std::string m_fragment_buffer; + + bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) + { + const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); + if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) + return false; + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); + return true; + } + public: typedef t_connection_context connection_context; typedef async_protocol_handler_config<t_connection_context> config_type; @@ -136,7 +154,6 @@ public: critical_section m_local_inv_buff_lock; std::string m_local_inv_buff; - critical_section m_send_lock; critical_section m_call_lock; volatile uint32_t m_wait_count; @@ -376,7 +393,12 @@ public: return false; } - if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size) + // these should never fail, but do runtime check for safety + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()"); + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()"); + + // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public + if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size()) { MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size << ", packet received " << m_cache_in_buffer.size() + cb @@ -408,8 +430,38 @@ public: } break; } + { + std::string temp{}; epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb); + m_state = stream_state_head; + + // abstract_tcp_server2.h manages max bandwidth for a p2p link + if (!(m_current_head.m_flags & (LEVIN_PACKET_REQUEST | LEVIN_PACKET_RESPONSE))) + { + // special noise/fragment command + static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END); + if ((m_current_head.m_flags & both_flags) == both_flags) + break; // noise message, skip to next message + + if (m_current_head.m_flags & LEVIN_PACKET_BEGIN) + m_fragment_buffer.clear(); + + m_fragment_buffer.append(reinterpret_cast<const char*>(buff_to_invoke.data()), buff_to_invoke.size()); + if (!(m_current_head.m_flags & LEVIN_PACKET_END)) + break; // skip to next message + + if (m_fragment_buffer.size() < sizeof(bucket_head2)) + { + MERROR(m_connection_context << "Fragmented data too small for levin header"); + return false; + } + + temp = std::move(m_fragment_buffer); + m_fragment_buffer.clear(); + std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2)); + buff_to_invoke = {reinterpret_cast<const uint8_t*>(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)}; + } bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE); @@ -458,43 +510,33 @@ public: if(m_current_head.m_have_to_return_data) { std::string return_buff; - m_current_head.m_return_code = m_config.m_pcommands_handler->invoke( - m_current_head.m_command, - buff_to_invoke, - return_buff, - m_connection_context); - m_current_head.m_cb = return_buff.size(); - m_current_head.m_have_to_return_data = false; - m_current_head.m_protocol_version = LEVIN_PROTOCOL_VER_1; - m_current_head.m_flags = LEVIN_PACKET_RESPONSE; -#if BYTE_ORDER == LITTLE_ENDIAN - std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); -#else - bucket_head2 head = m_current_head; - head.m_signature = SWAP64LE(head.m_signature); - head.m_cb = SWAP64LE(head.m_cb); - head.m_command = SWAP32LE(head.m_command); - head.m_return_code = SWAP32LE(head.m_return_code); - head.m_flags = SWAP32LE(head.m_flags); - head.m_protocol_version = SWAP32LE(head.m_protocol_version); - std::string send_buff((const char*)&head, sizeof(head)); -#endif - send_buff += return_buff; - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(send_buff.data(), send_buff.size())) + const uint32_t return_code = m_config.m_pcommands_handler->invoke( + m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context + ); + + bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); + head.m_return_code = SWAP32LE(return_code); + return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head)); + + if(!m_pservice_endpoint->do_send(byte_slice{std::move(return_buff)})) return false; - CRITICAL_REGION_END(); - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << m_current_head.m_cb - << ", flags" << m_current_head.m_flags - << ", r?=" << m_current_head.m_have_to_return_data - <<", cmd = " << m_current_head.m_command - << ", ver=" << m_current_head.m_protocol_version); + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); } else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); } + // reuse small buffer + if (!temp.empty() && temp.capacity() <= 64 * 1024) + { + temp.clear(); + m_fragment_buffer = std::move(temp); + } } - m_state = stream_state_head; break; case stream_state_head: { @@ -584,26 +626,10 @@ public: break; } - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_cb = SWAP64LE(in_buff.size()); - head.m_have_to_return_data = true; - - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); - CRITICAL_REGION_BEGIN(m_send_lock); - CRITICAL_REGION_LOCAL1(m_invoke_response_handlers_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); - err_code = LEVIN_ERROR_CONNECTION; - break; - } + CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock); - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { LOG_ERROR_CC(m_connection_context, "Failed to do_send"); err_code = LEVIN_ERROR_CONNECTION; @@ -620,7 +646,7 @@ public: if (LEVIN_OK != err_code) { - epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0}; + epee::span<const uint8_t> stub_buff = nullptr; // Never call callback inside critical section, that can cause deadlock cb(err_code, stub_buff, m_connection_context); return false; @@ -642,35 +668,13 @@ public: if(m_deletion_initiated) return LEVIN_ERROR_CONNECTION_DESTROYED; - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_cb = SWAP64LE(in_buff.size()); - head.m_have_to_return_data = true; - - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); - return LEVIN_ERROR_CONNECTION; - } - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); + LOG_ERROR_CC(m_connection_context, "Failed to send request"); return LEVIN_ERROR_CONNECTION; } - CRITICAL_REGION_END(); - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb - << ", f=" << head.m_flags - << ", r?=" << head.m_have_to_return_data - << ", cmd = " << head.m_command - << ", ver=" << head.m_protocol_version); uint64_t ticks_start = misc_utils::get_tick_count(); size_t prev_size = 0; @@ -716,33 +720,38 @@ public: if(m_deletion_initiated) return LEVIN_ERROR_CONNECTION_DESTROYED; - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_have_to_return_data = false; - head.m_cb = SWAP64LE(in_buff.size()); - - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false)) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); + LOG_ERROR_CC(m_connection_context, "Failed to send notify message"); return -1; } - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + return 1; + } + + /*! Sends `message` without adding a levin header. The message must have + been created with `make_notify`, `make_noise_notify` or + `make_fragmented_notify`. See additional instructions for + `make_fragmented_notify`. + + \return 1 on success */ + int send(byte_slice message) + { + const misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this) + ); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + const std::size_t length = message.size(); + if (!m_pservice_endpoint->do_send(std::move(message))) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); + LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); return -1; } - CRITICAL_REGION_END(); - LOG_DEBUG_CC(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb << - ", f=" << head.m_flags << - ", r?=" << head.m_have_to_return_data << - ", cmd = " << head.m_command << - ", ver=" << head.m_protocol_version); + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]"); return 1; } //------------------------------------------------------------------------------------------ @@ -923,6 +932,14 @@ int async_protocol_handler_config<t_connection_context>::notify(int command, con } //------------------------------------------------------------------------------------------ template<class t_connection_context> +int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->send(std::move(message)) : 0; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id) { CRITICAL_REGION_LOCAL(m_connects_lock); diff --git a/contrib/epee/include/net/net_helper.h b/contrib/epee/include/net/net_helper.h index e315555fc..2b02eafa4 100644 --- a/contrib/epee/include/net/net_helper.h +++ b/contrib/epee/include/net/net_helper.h @@ -31,6 +31,7 @@ //#include <Winsock2.h> //#include <Ws2tcpip.h> +#include <atomic> #include <string> #include <boost/version.hpp> #include <boost/asio/io_service.hpp> @@ -154,7 +155,7 @@ namespace net_utils } inline - try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout, epee::net_utils::ssl_support_t ssl_support) + try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout) { m_deadline.expires_from_now(timeout); boost::unique_future<boost::asio::ip::tcp::socket> connection = m_connector(addr, port, m_deadline); @@ -174,11 +175,11 @@ namespace net_utils m_connected = true; m_deadline.expires_at(std::chrono::steady_clock::time_point::max()); // SSL Options - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { if (!m_ssl_options.handshake(*m_ssl_socket, boost::asio::ssl::stream_base::client, addr)) { - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { boost::system::error_code ignored_ec; m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); @@ -217,7 +218,7 @@ namespace net_utils // Get a list of endpoints corresponding to the server name. - try_connect_result_t try_connect_result = try_connect(addr, port, timeout, m_ssl_options.support); + try_connect_result_t try_connect_result = try_connect(addr, port, timeout); if (try_connect_result == CONNECT_FAILURE) return false; if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) @@ -226,7 +227,7 @@ namespace net_utils { MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL"); m_ssl_options.support = epee::net_utils::ssl_support_t::e_ssl_support_disabled; - if (try_connect(addr, port, timeout, m_ssl_options.support) != CONNECT_SUCCESS) + if (try_connect(addr, port, timeout) != CONNECT_SUCCESS) return false; } } @@ -562,7 +563,7 @@ namespace net_utils { m_deadline.cancel(); boost::system::error_code ec; - if(m_ssl_options.support != ssl_support_t::e_ssl_support_disabled) + if(m_ssl_options) shutdown_ssl(); m_ssl_socket->next_layer().cancel(ec); if(ec) diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h index 5ae3e53b3..dd80fae8b 100644 --- a/contrib/epee/include/net/net_utils_base.h +++ b/contrib/epee/include/net/net_utils_base.h @@ -34,9 +34,10 @@ #include <boost/asio/ip/address_v6.hpp> #include <typeinfo> #include <type_traits> +#include "byte_slice.h" #include "enums.h" -#include "serialization/keyvalue_serialization.h" #include "misc_log_ex.h" +#include "serialization/keyvalue_serialization.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" @@ -424,7 +425,7 @@ namespace net_utils /************************************************************************/ struct i_service_endpoint { - virtual bool do_send(const void* ptr, size_t cb)=0; + virtual bool do_send(byte_slice message)=0; virtual bool close()=0; virtual bool send_done()=0; virtual bool call_run_once_service_io()=0; |