diff options
-rw-r--r-- | contrib/epee/include/net/levin_base.h | 3 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler_async.h | 32 | ||||
-rw-r--r-- | src/cryptonote_basic/connection_context.h | 2 | ||||
-rw-r--r-- | src/cryptonote_protocol/cryptonote_protocol_handler.inl | 1 | ||||
-rw-r--r-- | src/cryptonote_protocol/levin_notify.cpp | 2 | ||||
-rw-r--r-- | src/p2p/net_node.h | 3 | ||||
-rw-r--r-- | tests/fuzz/levin.cpp | 2 | ||||
-rw-r--r-- | tests/net_load_tests/net_load_tests.h | 2 | ||||
-rw-r--r-- | tests/unit_tests/epee_levin_protocol_handler_async.cpp | 3 | ||||
-rw-r--r-- | tests/unit_tests/levin.cpp | 1 |
10 files changed, 40 insertions, 11 deletions
diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index f9b6f9a81..2c96c47b9 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -72,7 +72,8 @@ namespace levin #define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED 0 -#define LEVIN_DEFAULT_MAX_PACKET_SIZE 100000000 //100MB by default +#define LEVIN_INITIAL_MAX_PACKET_SIZE 256*1024 // 256 KiB before handshake +#define LEVIN_DEFAULT_MAX_PACKET_SIZE 100000000 //100MB by default after handshake #define LEVIN_PACKET_REQUEST 0x00000001 #define LEVIN_PACKET_RESPONSE 0x00000002 diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 1341a4ae6..f77f5d0a1 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -84,7 +84,8 @@ class async_protocol_handler_config public: typedef t_connection_context connection_context; - uint64_t m_max_packet_size; + uint64_t m_initial_max_packet_size; + uint64_t m_max_packet_size; uint64_t m_invoke_timeout; int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id); @@ -105,7 +106,7 @@ public: size_t get_in_connections_count(); void set_handler(levin_commands_handler<t_connection_context>* handler, void (*destroy)(levin_commands_handler<t_connection_context>*) = NULL); - async_protocol_handler_config():m_pcommands_handler(NULL), m_pcommands_handler_destroy(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE), m_invoke_timeout(LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + async_protocol_handler_config():m_pcommands_handler(NULL), m_pcommands_handler_destroy(NULL), m_initial_max_packet_size(LEVIN_INITIAL_MAX_PACKET_SIZE), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE), m_invoke_timeout(LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) {} ~async_protocol_handler_config() { set_handler(NULL, NULL); } void del_out_connections(size_t count); @@ -162,6 +163,7 @@ public: net_utils::i_service_endpoint* m_pservice_endpoint; config_type& m_config; t_connection_context& m_connection_context; + std::atomic<uint64_t> m_max_packet_size; net_utils::buffer m_cache_in_buffer; stream_state m_state; @@ -289,7 +291,8 @@ public: m_current_head(bucket_head2()), m_pservice_endpoint(psnd_hndlr), m_config(config), - m_connection_context(conn_context), + m_connection_context(conn_context), + m_max_packet_size(config.m_initial_max_packet_size), m_cache_in_buffer(4 * 1024), m_state(stream_state_head) { @@ -399,13 +402,14 @@ public: } // these should never fail, but do runtime check for safety - CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()"); - CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()"); + const uint64_t max_packet_size = m_max_packet_size; + CHECK_AND_ASSERT_MES(max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()"); + CHECK_AND_ASSERT_MES(max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()"); // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public - if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size()) + if(cb > max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size()) { - MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size + MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << max_packet_size << ", packet received " << m_cache_in_buffer.size() + cb << ", connection will be closed."); return false; @@ -519,6 +523,10 @@ public: m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context ); + // peer_id remains unset if dropped + if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete()) + m_max_packet_size = m_config.m_max_packet_size; + bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); head.m_return_code = SWAP32LE(return_code); return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head)); @@ -577,9 +585,9 @@ public: m_cache_in_buffer.erase(sizeof(bucket_head2)); m_state = stream_state_body; m_oponent_protocol_ver = m_current_head.m_protocol_version; - if(m_current_head.m_cb > m_config.m_max_packet_size) + if(m_current_head.m_cb > max_packet_size) { - LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size + LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << max_packet_size << ", packet header received " << m_current_head.m_cb << ", connection will be closed."); return false; @@ -634,6 +642,9 @@ public: boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock); + if (command == m_connection_context.handshake_command()) + m_max_packet_size = m_config.m_max_packet_size; + if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { LOG_ERROR_CC(m_connection_context, "Failed to do_send"); @@ -675,6 +686,9 @@ public: boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); + if (command == m_connection_context.handshake_command()) + m_max_packet_size = m_config.m_max_packet_size; + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { LOG_ERROR_CC(m_connection_context, "Failed to send request"); diff --git a/src/cryptonote_basic/connection_context.h b/src/cryptonote_basic/connection_context.h index 9e012f8f5..e5c00d4f3 100644 --- a/src/cryptonote_basic/connection_context.h +++ b/src/cryptonote_basic/connection_context.h @@ -55,6 +55,8 @@ namespace cryptonote state_normal }; + bool handshake_complete() const noexcept { return m_state != state_before_handshake; } + state m_state; std::vector<std::pair<crypto::hash, uint64_t>> m_needed_objects; std::unordered_set<crypto::hash> m_requested_objects; diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl index c76e5628d..1a4c6d033 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl @@ -2634,6 +2634,7 @@ skip: std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> fullConnections, fluffyConnections; m_p2p->for_each_connection([this, &exclude_context, &fullConnections, &fluffyConnections](connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags) { + // peer_id also filters out connections before handshake if (peer_id && exclude_context.m_connection_id != context.m_connection_id && context.m_remote_address.get_zone() == epee::net_utils::zone::public_) { if(m_core.fluffy_blocks_enabled() && (support_flags & P2P_SUPPORT_FLAG_FLUFFY_BLOCKS)) diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index 69df22a92..ab4eeeb82 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -442,7 +442,7 @@ namespace levin zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context) { // When i2p/tor, only fluff to outbound connections - if (source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) + if (context.handshake_complete() && source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) { if (context.fluff_txs.empty()) context.flush_time = now + (context.m_is_income ? in_duration() : out_duration()); diff --git a/src/p2p/net_node.h b/src/p2p/net_node.h index 9fba5d636..1f9fa46a4 100644 --- a/src/p2p/net_node.h +++ b/src/p2p/net_node.h @@ -118,6 +118,8 @@ namespace nodetool m_in_timedsync(false) {} + static constexpr int handshake_command() noexcept { return 1001; } + std::vector<cryptonote::blobdata> fluff_txs; std::chrono::steady_clock::time_point flush_time; peerid_type peer_id; @@ -139,6 +141,7 @@ namespace nodetool typedef COMMAND_HANDSHAKE_T<typename t_payload_net_handler::payload_type> COMMAND_HANDSHAKE; typedef COMMAND_TIMED_SYNC_T<typename t_payload_net_handler::payload_type> COMMAND_TIMED_SYNC; + static_assert(p2p_connection_context::handshake_command() == COMMAND_HANDSHAKE::ID, "invalid handshake command id"); typedef epee::net_utils::boosted_tcp_server<epee::levin::async_protocol_handler<p2p_connection_context>> net_server; diff --git a/tests/fuzz/levin.cpp b/tests/fuzz/levin.cpp index 012d05f36..0ba0ff7f3 100644 --- a/tests/fuzz/levin.cpp +++ b/tests/fuzz/levin.cpp @@ -52,6 +52,8 @@ namespace struct test_levin_connection_context : public epee::net_utils::connection_context_base { + static constexpr int handshake_command() noexcept { return 1001; } + static constexpr bool handshake_complete() noexcept { return true; } }; typedef epee::levin::async_protocol_handler_config<test_levin_connection_context> test_levin_protocol_handler_config; diff --git a/tests/net_load_tests/net_load_tests.h b/tests/net_load_tests/net_load_tests.h index 882d42c02..1cc68746a 100644 --- a/tests/net_load_tests/net_load_tests.h +++ b/tests/net_load_tests/net_load_tests.h @@ -48,6 +48,8 @@ namespace net_load_tests struct test_connection_context : epee::net_utils::connection_context_base { test_connection_context(): epee::net_utils::connection_context_base(boost::uuids::nil_uuid(), {}, false, false), m_closed(false) {} + static constexpr int handshake_command() noexcept { return 1001; } + static constexpr bool handshake_complete() noexcept { return true; } volatile bool m_closed; }; diff --git a/tests/unit_tests/epee_levin_protocol_handler_async.cpp b/tests/unit_tests/epee_levin_protocol_handler_async.cpp index 314fcf9f2..55092fd90 100644 --- a/tests/unit_tests/epee_levin_protocol_handler_async.cpp +++ b/tests/unit_tests/epee_levin_protocol_handler_async.cpp @@ -43,6 +43,8 @@ namespace { struct test_levin_connection_context : public epee::net_utils::connection_context_base { + static constexpr int handshake_command() noexcept { return 1001; } + static constexpr bool handshake_complete() noexcept { return true; } }; typedef epee::levin::async_protocol_handler_config<test_levin_connection_context> test_levin_protocol_handler_config; @@ -194,6 +196,7 @@ namespace { m_handler_config.set_handler(m_pcommands_handler, [](epee::levin::levin_commands_handler<test_levin_connection_context> *handler) { delete handler; }); m_handler_config.m_invoke_timeout = invoke_timeout; + m_handler_config.m_initial_max_packet_size = max_packet_size; m_handler_config.m_max_packet_size = max_packet_size; } diff --git a/tests/unit_tests/levin.cpp b/tests/unit_tests/levin.cpp index 22638942d..76e3282e0 100644 --- a/tests/unit_tests/levin.cpp +++ b/tests/unit_tests/levin.cpp @@ -178,6 +178,7 @@ namespace { using base_type = epee::net_utils::connection_context_base; static_cast<base_type&>(context_) = base_type{random_generator(), {}, is_incoming, false}; + context_.m_state = cryptonote::cryptonote_connection_context::state_normal; handler_.after_init_connection(); } |