diff options
author | Antonio Juarez <antonio.maria.juarez@live.com> | 2014-03-03 22:07:58 +0000 |
---|---|---|
committer | Antonio Juarez <antonio.maria.juarez@live.com> | 2014-03-03 22:07:58 +0000 |
commit | 296ae46ed8f8f6e5f986f978febad302e3df231a (patch) | |
tree | 1629164454a239308f33c9e12afb22e7f3cd8eeb /tests/net_load_tests | |
parent | changed name (diff) | |
download | monero-296ae46ed8f8f6e5f986f978febad302e3df231a.tar.xz |
moved all stuff to github
Diffstat (limited to '')
-rw-r--r-- | tests/net_load_tests/clt.cpp | 603 | ||||
-rw-r--r-- | tests/net_load_tests/net_load_tests.h | 329 | ||||
-rw-r--r-- | tests/net_load_tests/srv.cpp | 211 |
3 files changed, 1143 insertions, 0 deletions
diff --git a/tests/net_load_tests/clt.cpp b/tests/net_load_tests/clt.cpp new file mode 100644 index 000000000..45c1d0859 --- /dev/null +++ b/tests/net_load_tests/clt.cpp @@ -0,0 +1,603 @@ +// Copyright (c) 2012-2013 The Cryptonote developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include <atomic> +#include <chrono> +#include <functional> +#include <numeric> +#include <thread> +#include <vector> + +#include "gtest/gtest.h" + +#include "include_base_utils.h" +#include "misc_log_ex.h" +#include "storages/levin_abstract_invoke2.h" + +#include "net_load_tests.h" + +using namespace net_load_tests; + +namespace +{ + const size_t CONNECTION_COUNT = 100000; + const size_t CONNECTION_TIMEOUT = 10000; + const size_t DEFAULT_OPERATION_TIMEOUT = 30000; + const size_t RESERVED_CONN_CNT = 1; + + template<typename t_predicate> + bool busy_wait_for(size_t timeout_ms, const t_predicate& predicate, size_t sleep_ms = 10) + { + for (size_t i = 0; i < timeout_ms / sleep_ms; ++i) + { + if (predicate()) + return true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + } + return false; + } + + class t_connection_opener_1 + { + public: + t_connection_opener_1(test_tcp_server& tcp_server, size_t open_request_target) + : m_tcp_server(tcp_server) + , m_open_request_target(open_request_target) + , m_next_id(0) + , m_error_count(0) + , m_connections(open_request_target) + { + for (auto& conn_id : m_connections) + conn_id = boost::uuids::nil_uuid(); + } + + bool open() + { + size_t id = m_next_id.fetch_add(1, std::memory_order_relaxed); + if (m_open_request_target <= id) + return false; + + bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) { + if (!ec) + { + m_connections[id] = context.m_connection_id; + } + else + { + m_error_count.fetch_add(1, std::memory_order_relaxed); + } + }); + + if (!r) + { + m_error_count.fetch_add(1, std::memory_order_relaxed); + } + + return true; + } + + bool close(size_t id) + { + if (!m_connections[id].is_nil()) + { + m_tcp_server.get_config_object().close(m_connections[id]); + return true; + } + else + { + return false; + } + } + + size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); } + + private: + test_tcp_server& m_tcp_server; + size_t m_open_request_target; + std::atomic<size_t> m_next_id; + std::atomic<size_t> m_error_count; + std::vector<boost::uuids::uuid> m_connections; + }; + + class t_connection_opener_2 + { + public: + t_connection_opener_2(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count) + : m_tcp_server(tcp_server) + , m_open_request_target(open_request_target) + , m_open_request_count(0) + , m_error_count(0) + , m_open_close_test_helper(tcp_server, open_request_target, max_opened_connection_count) + { + } + + bool open_and_close() + { + size_t req_count = m_open_request_count.fetch_add(1, std::memory_order_relaxed); + if (m_open_request_target <= req_count) + return false; + + bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) { + if (!ec) + { + m_open_close_test_helper.handle_new_connection(context.m_connection_id); + } + else + { + m_error_count.fetch_add(1, std::memory_order_relaxed); + } + }); + + if (!r) + { + m_error_count.fetch_add(1, std::memory_order_relaxed); + } + + return true; + } + + void close_remaining_connections() + { + m_open_close_test_helper.close_remaining_connections(); + } + + size_t opened_connection_count() const { return m_open_close_test_helper.opened_connection_count(); } + size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); } + + private: + test_tcp_server& m_tcp_server; + size_t m_open_request_target; + std::atomic<size_t> m_open_request_count; + std::atomic<size_t> m_error_count; + open_close_test_helper m_open_close_test_helper; + }; + + class net_load_test_clt : public ::testing::Test + { + protected: + virtual void SetUp() + { + m_thread_count = (std::max)(min_thread_count, std::thread::hardware_concurrency() / 2); + + m_tcp_server.get_config_object().m_pcommands_handler = &m_commands_handler; + m_tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT; + + ASSERT_TRUE(m_tcp_server.init_server(clt_port, "127.0.0.1")); + ASSERT_TRUE(m_tcp_server.run_server(m_thread_count, false)); + + // Connect to server + std::atomic<int> conn_status(0); + m_cmd_conn_id = boost::uuids::nil_uuid(); + ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) { + if (!ec) + { + m_cmd_conn_id = context.m_connection_id; + } + else + { + LOG_ERROR("Connection error: " << ec.message()); + } + conn_status.store(1, std::memory_order_seq_cst); + })); + + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out"; + ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst)); + ASSERT_FALSE(m_cmd_conn_id.is_nil()); + + conn_status.store(0, std::memory_order_seq_cst); + CMD_RESET_STATISTICS::request req; + ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_RESET_STATISTICS::response>(m_cmd_conn_id, CMD_RESET_STATISTICS::ID, req, + m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) { + conn_status.store(code, std::memory_order_seq_cst); + })); + + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "reset statistics timed out"; + ASSERT_LT(0, conn_status.load(std::memory_order_seq_cst)); + } + + virtual void TearDown() + { + m_tcp_server.send_stop_signal(); + ASSERT_TRUE(m_tcp_server.timed_wait_server_stop(DEFAULT_OPERATION_TIMEOUT)); + } + + static void TearDownTestCase() + { + // Stop server + test_levin_commands_handler commands_handler; + test_tcp_server tcp_server; + tcp_server.get_config_object().m_pcommands_handler = &commands_handler; + tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT; + + if (!tcp_server.init_server(clt_port, "127.0.0.1")) return; + if (!tcp_server.run_server(2, false)) return; + + // Connect to server and invoke shutdown command + std::atomic<int> conn_status(0); + boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid(); + tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) { + cmd_conn_id = context.m_connection_id; + conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst); + }); + + if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return; + if (1 != conn_status.load(std::memory_order_seq_cst)) return; + + epee::net_utils::notify_remote_command2(cmd_conn_id, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object()); + + busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); }); + } + + template<typename Func> + static auto call_func(size_t /*thread_index*/, const Func& func, int) -> decltype(func()) + { + func(); + } + + template<typename Func> + static auto call_func(size_t thread_index, const Func& func, long) -> decltype(func(thread_index)) + { + func(thread_index); + } + + template<typename Func> + void parallel_exec(const Func& func) + { + unit_test::call_counter properly_finished_threads; + std::vector<std::thread> threads(m_thread_count); + for (size_t i = 0; i < threads.size(); ++i) + { + threads[i] = std::thread([&, i] { + call_func(i, func, 0); + properly_finished_threads.inc(); + }); + } + + for (auto& th : threads) + th.join(); + + ASSERT_EQ(properly_finished_threads.get(), m_thread_count); + } + + void get_server_statistics(CMD_GET_STATISTICS::response& statistics) + { + std::atomic<int> req_status(0); + CMD_GET_STATISTICS::request req; + ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_GET_STATISTICS::response>(m_cmd_conn_id, CMD_GET_STATISTICS::ID, req, + m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) { + if (0 < code) + { + statistics = rsp; + } + else + { + LOG_ERROR("Get server statistics error: " << code); + } + req_status.store(0 < code ? 1 : -1, std::memory_order_seq_cst); + })); + + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != req_status.load(std::memory_order_seq_cst); })) << "get_server_statistics timed out"; + ASSERT_EQ(1, req_status.load(std::memory_order_seq_cst)); + } + + template <typename t_predicate> + bool busy_wait_for_server_statistics(CMD_GET_STATISTICS::response& statistics, const t_predicate& predicate) + { + for (size_t i = 0; i < 30; ++i) + { + get_server_statistics(statistics); + if (predicate(statistics)) + { + return true; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + return false; + } + + void ask_for_data_requests(size_t request_size = 0) + { + CMD_SEND_DATA_REQUESTS::request req; + req.request_size = request_size; + epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object()); + } + + protected: + test_tcp_server m_tcp_server; + test_levin_commands_handler m_commands_handler; + size_t m_thread_count; + boost::uuids::uuid m_cmd_conn_id; + }; +} + +TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client) +{ + // Open connections + t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT); + parallel_exec([&] { + while (connection_opener.open()); + }); + + // Wait for all open requests to complete + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); })); + LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() << + " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")"); + + // Check + ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT); + ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT); + ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count()); + + // Close connections + parallel_exec([&](size_t thread_idx) { + for (size_t i = thread_idx; i < CONNECTION_COUNT; i += m_thread_count) + { + connection_opener.close(i); + } + }); + + // Wait for all opened connections to close + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); })); + LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() << + " / " << m_commands_handler.close_connection_counter()); + + // Check all connections are closed + ASSERT_EQ(m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT, m_commands_handler.close_connection_counter()); + ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count()); + + // Wait for server to handle all open and close requests + CMD_GET_STATISTICS::response srv_stat; + busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; }); + LOG_PRINT_L0("server statistics: " << srv_stat.to_string()); + + // Check server status + // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets + ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT); + ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count); + + // Request data from server, it causes to close rest connections + ask_for_data_requests(); + + // Wait for server to close rest connections + busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; }); + LOG_PRINT_L0("server statistics: " << srv_stat.to_string()); + + // Check server status. All connections should be closed + ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT); + ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count); +} + +TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_server) +{ + // Open connections + t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT); + parallel_exec([&] { + while (connection_opener.open()); + }); + + // Wait for all open requests to complete + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); })); + LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() << + " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")"); + + // Check + ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT); + ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT); + ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count()); + + // Wait for server accepts all connections + CMD_GET_STATISTICS::response srv_stat; + int last_new_connection_counter = -1; + busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) { + if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true; + else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; } + }); + + // Close connections + CMD_CLOSE_ALL_CONNECTIONS::request req; + ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object())); + + // Wait for all opened connections to close + busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }); + LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() << + " / " << m_commands_handler.close_connection_counter()); + + // It's OK, if server didn't close all connections, because it could accept not all our connections + ASSERT_LE(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT); + ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count()); + + // Wait for server to handle all open and close requests + busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; }); + LOG_PRINT_L0("server statistics: " << srv_stat.to_string()); + + // Check server status + ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT); + ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count); + + // Close rest connections + m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) { + if (ctx.m_connection_id != m_cmd_conn_id) + { + CMD_DATA_REQUEST::request req; + bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req, + m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) { + if (code <= 0) + { + LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code); + } + }); + if (!r) + LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST"); + } + return true; + }); + + // Wait for all opened connections to close + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); })); + LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() << + " / " << m_commands_handler.close_connection_counter()); + + // Check + ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT); + ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count()); +} + +TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_client) +{ + static const size_t MAX_OPENED_CONN_COUNT = 100; + + // Open/close connections + t_connection_opener_2 connection_opener(m_tcp_server, CONNECTION_COUNT, MAX_OPENED_CONN_COUNT); + parallel_exec([&] { + while (connection_opener.open_and_close()); + }); + + // Wait for all open requests to complete + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); })); + LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() << + " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")"); + + // Check + ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT); + ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT); + + // Wait for all close requests to complete + EXPECT_TRUE(busy_wait_for(4 * DEFAULT_OPERATION_TIMEOUT, [&](){ return connection_opener.opened_connection_count() <= MAX_OPENED_CONN_COUNT; })); + LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count()); + + // Check + ASSERT_EQ(MAX_OPENED_CONN_COUNT, connection_opener.opened_connection_count()); + + connection_opener.close_remaining_connections(); + + // Wait for all close requests to complete + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; })); + LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count()); + + ASSERT_EQ(m_commands_handler.new_connection_counter(), m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT); + ASSERT_EQ(0, connection_opener.opened_connection_count()); + ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count()); + + // Wait for server to handle all open and close requests + CMD_GET_STATISTICS::response srv_stat; + busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; }); + LOG_PRINT_L0("server statistics: " << srv_stat.to_string()); + + // Check server status + // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets + ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT); + ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count); + + // Request data from server, it causes to close rest connections + ask_for_data_requests(); + + // Wait for server to close rest connections + busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; }); + LOG_PRINT_L0("server statistics: " << srv_stat.to_string()); + + // Check server status. All connections should be closed + ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT); + ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count); +} + +TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_server) +{ + static const size_t MAX_OPENED_CONN_COUNT = 100; + + // Init test + std::atomic<int> test_state(0); + CMD_START_OPEN_CLOSE_TEST::request req_start; + req_start.open_request_target = CONNECTION_COUNT; + req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT; + ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start, + m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) { + test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst); + })); + + // Wait for server response + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 1 == test_state.load(std::memory_order_seq_cst); })); + ASSERT_EQ(1, test_state.load(std::memory_order_seq_cst)); + + // Open connections + t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT); + parallel_exec([&] { + while (connection_opener.open()); + }); + + // Wait for all open requests to complete + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); })); + LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() << + " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")"); + LOG_PRINT_L0("actual number of opened connections: " << m_tcp_server.get_config_object().get_connections_count()); + + ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT); + ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT); + + // Wait for server accepts all connections + CMD_GET_STATISTICS::response srv_stat; + int last_new_connection_counter = -1; + busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) { + if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true; + else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; } + }); + + // Ask server to close rest connections + CMD_CLOSE_ALL_CONNECTIONS::request req; + ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object())); + + // Wait for almost all connections to be closed by server + busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }); + + // It's OK, if there are opened connections, because server could accept not all our connections + ASSERT_LE(m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT, m_commands_handler.new_connection_counter()); + ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count()); + + // Wait for server to handle all open and close requests + busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; }); + LOG_PRINT_L0("server statistics: " << srv_stat.to_string()); + + // Check server status + ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT); + ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count); + + // Close rest connections + m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) { + if (ctx.m_connection_id != m_cmd_conn_id) + { + CMD_DATA_REQUEST::request req; + bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req, + m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) { + if (code <= 0) + { + LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code); + } + }); + if (!r) + LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST"); + } + return true; + }); + + // Wait for all opened connections to close + EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); })); + LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() << + " / " << m_commands_handler.close_connection_counter()); + + // Check + ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT); + ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count()); +} + +int main(int argc, char** argv) +{ + epee::debug::get_set_enable_assert(true, false); + //set up logging options + epee::log_space::get_set_log_detalisation_level(true, LOG_LEVEL_0); + epee::log_space::log_singletone::add_logger(LOGGER_CONSOLE, NULL, NULL); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/net_load_tests/net_load_tests.h b/tests/net_load_tests/net_load_tests.h new file mode 100644 index 000000000..20da11bf3 --- /dev/null +++ b/tests/net_load_tests/net_load_tests.h @@ -0,0 +1,329 @@ +// Copyright (c) 2012-2013 The Cryptonote developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#pragma once + +#include <atomic> + +#include <boost/asio/io_service.hpp> + +#include "include_base_utils.h" +#include "string_tools.h" +#include "net/levin_protocol_handler_async.h" +#include "net/abstract_tcp_server2.h" +#include "serialization/keyvalue_serialization.h" + +#include "../unit_tests/unit_tests_utils.h" + +namespace net_load_tests +{ + struct test_connection_context : epee::net_utils::connection_context_base + { + volatile bool m_closed; + }; + + typedef epee::levin::async_protocol_handler<test_connection_context> test_levin_protocol_handler; + typedef epee::levin::async_protocol_handler_config<test_connection_context> test_levin_protocol_handler_config; + typedef epee::net_utils::connection<test_levin_protocol_handler> test_connection; + typedef epee::net_utils::boosted_tcp_server<test_levin_protocol_handler> test_tcp_server; + + struct test_levin_commands_handler : public epee::levin::levin_commands_handler<test_connection_context> + { + test_levin_commands_handler() + //: m_return_code(LEVIN_OK) + //, m_last_command(-1) + { + } + + virtual int invoke(int command, const std::string& in_buff, std::string& buff_out, test_connection_context& context) + { + //m_invoke_counter.inc(); + //std::unique_lock<std::mutex> lock(m_mutex); + //m_last_command = command; + //m_last_in_buf = in_buff; + //buff_out = m_invoke_out_buf; + //return m_return_code; + return LEVIN_OK; + } + + virtual int notify(int command, const std::string& in_buff, test_connection_context& context) + { + //m_notify_counter.inc(); + //std::unique_lock<std::mutex> lock(m_mutex); + //m_last_command = command; + //m_last_in_buf = in_buff; + //return m_return_code; + return LEVIN_OK; + } + + virtual void callback(test_connection_context& context) + { + //m_callback_counter.inc(); + //std::cout << "test_levin_commands_handler::callback()" << std::endl; + } + + virtual void on_connection_new(test_connection_context& context) + { + m_new_connection_counter.inc(); + //std::cout << "test_levin_commands_handler::on_connection_new()" << std::endl; + } + + virtual void on_connection_close(test_connection_context& context) + { + m_close_connection_counter.inc(); + //std::cout << "test_levin_commands_handler::on_connection_close()" << std::endl; + } + + //size_t invoke_counter() const { return m_invoke_counter.get(); } + //size_t notify_counter() const { return m_notify_counter.get(); } + //size_t callback_counter() const { return m_callback_counter.get(); } + size_t new_connection_counter() const { return m_new_connection_counter.get(); } + size_t close_connection_counter() const { return m_close_connection_counter.get(); } + + //int return_code() const { return m_return_code; } + //void return_code(int v) { m_return_code = v; } + + //const std::string& invoke_out_buf() const { return m_invoke_out_buf; } + //void invoke_out_buf(const std::string& v) { m_invoke_out_buf = v; } + + //int last_command() const { return m_last_command; } + //const std::string& last_in_buf() const { return m_last_in_buf; } + + protected: + //unit_test::call_counter m_invoke_counter; + //unit_test::call_counter m_notify_counter; + //unit_test::call_counter m_callback_counter; + unit_test::call_counter m_new_connection_counter; + unit_test::call_counter m_close_connection_counter; + + //std::mutex m_mutex; + + //int m_return_code; + //std::string m_invoke_out_buf; + + //int m_last_command; + //std::string m_last_in_buf; + }; + + class open_close_test_helper + { + public: + open_close_test_helper(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count) + : m_tcp_server(tcp_server) + , m_open_request_target(open_request_target) + , m_max_opened_connection_count(max_opened_connection_count) + , m_opened_connection_count(0) + , m_next_opened_conn_idx(0) + , m_next_closed_conn_idx(0) + , m_connections(open_request_target) + { + for (auto& conn_id : m_connections) + conn_id = boost::uuids::nil_uuid(); + } + + bool handle_new_connection(const boost::uuids::uuid& connection_id, bool ignore_close_fails = false) + { + size_t idx = m_next_opened_conn_idx.fetch_add(1, std::memory_order_relaxed); + m_connections[idx] = connection_id; + + size_t prev_connection_count = m_opened_connection_count.fetch_add(1, std::memory_order_relaxed); + if (m_max_opened_connection_count <= prev_connection_count) + { + return close_next_connection(ignore_close_fails); + } + + return true; + } + + void close_remaining_connections() + { + while (close_next_connection(false)); + } + + bool close_next_connection(bool ignore_close_fails) + { + size_t idx = m_next_closed_conn_idx.fetch_add(1, std::memory_order_relaxed); + if (m_next_opened_conn_idx.load(std::memory_order_relaxed) <= idx) + { + LOG_PRINT_L0("Not enough opened connections"); + return false; + } + if (m_connections[idx].is_nil()) + { + LOG_PRINT_L0("Connection isn't opened"); + return false; + } + if (!m_tcp_server.get_config_object().close(m_connections[idx])) + { + LOG_PRINT_L0("Close connection error: " << m_connections[idx]); + if (!ignore_close_fails) + { + return false; + } + } + + m_connections[idx] = boost::uuids::nil_uuid(); + m_opened_connection_count.fetch_sub(1, std::memory_order_relaxed); + return true; + } + + size_t opened_connection_count() const { return m_opened_connection_count.load(std::memory_order_relaxed); } + + private: + test_tcp_server& m_tcp_server; + size_t m_open_request_target; + size_t m_max_opened_connection_count; + std::atomic<size_t> m_opened_connection_count; + std::atomic<size_t> m_next_opened_conn_idx; + std::atomic<size_t> m_next_closed_conn_idx; + std::vector<boost::uuids::uuid> m_connections; + }; + + const unsigned int min_thread_count = 2; + const std::string clt_port("36230"); + const std::string srv_port("36231"); + + enum command_ids + { + cmd_close_all_connections_id = 73564, + cmd_start_open_close_test_id, + cmd_get_statistics_id, + cmd_reset_statistics_id, + cmd_shutdown_id, + cmd_send_data_requests_id, + cmd_data_request_id + }; + + struct CMD_CLOSE_ALL_CONNECTIONS + { + const static int ID = cmd_close_all_connections_id; + + struct request + { + BEGIN_KV_SERIALIZE_MAP() + END_KV_SERIALIZE_MAP() + }; + }; + + struct CMD_START_OPEN_CLOSE_TEST + { + const static int ID = cmd_start_open_close_test_id; + + struct request + { + size_t open_request_target; + size_t max_opened_conn_count; + + BEGIN_KV_SERIALIZE_MAP() + KV_SERIALIZE(open_request_target) + KV_SERIALIZE(max_opened_conn_count) + END_KV_SERIALIZE_MAP() + }; + + struct response + { + BEGIN_KV_SERIALIZE_MAP() + END_KV_SERIALIZE_MAP() + }; + }; + + struct CMD_GET_STATISTICS + { + const static int ID = cmd_get_statistics_id; + + struct request + { + BEGIN_KV_SERIALIZE_MAP() + END_KV_SERIALIZE_MAP() + }; + + struct response + { + size_t opened_connections_count; + size_t new_connection_counter; + size_t close_connection_counter; + + BEGIN_KV_SERIALIZE_MAP() + KV_SERIALIZE(opened_connections_count) + KV_SERIALIZE(new_connection_counter) + KV_SERIALIZE(close_connection_counter) + END_KV_SERIALIZE_MAP() + + std::string to_string() const + { + std::stringstream ss; + ss << "opened_connections_count = " << opened_connections_count << + ", new_connection_counter = " << new_connection_counter << + ", close_connection_counter = " << close_connection_counter; + return ss.str(); + } + }; + }; + + struct CMD_RESET_STATISTICS + { + const static int ID = cmd_reset_statistics_id; + + struct request + { + BEGIN_KV_SERIALIZE_MAP() + END_KV_SERIALIZE_MAP() + }; + + struct response + { + BEGIN_KV_SERIALIZE_MAP() + END_KV_SERIALIZE_MAP() + }; + }; + + struct CMD_SHUTDOWN + { + const static int ID = cmd_shutdown_id; + + struct request + { + BEGIN_KV_SERIALIZE_MAP() + END_KV_SERIALIZE_MAP() + }; + }; + + struct CMD_SEND_DATA_REQUESTS + { + const static int ID = cmd_send_data_requests_id; + + struct request + { + size_t request_size; + + BEGIN_KV_SERIALIZE_MAP() + KV_SERIALIZE(request_size) + END_KV_SERIALIZE_MAP() + }; + }; + + struct CMD_DATA_REQUEST + { + const static int ID = cmd_data_request_id; + + struct request + { + std::string data; + size_t response_size; + + BEGIN_KV_SERIALIZE_MAP() + KV_SERIALIZE(data) + END_KV_SERIALIZE_MAP() + }; + + struct response + { + std::string data; + + BEGIN_KV_SERIALIZE_MAP() + KV_SERIALIZE(data) + END_KV_SERIALIZE_MAP() + }; + }; +} diff --git a/tests/net_load_tests/srv.cpp b/tests/net_load_tests/srv.cpp new file mode 100644 index 000000000..52895c9dd --- /dev/null +++ b/tests/net_load_tests/srv.cpp @@ -0,0 +1,211 @@ +// Copyright (c) 2012-2013 The Cryptonote developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include <mutex> +#include <thread> + +#include "include_base_utils.h" +#include "misc_log_ex.h" +#include "storages/levin_abstract_invoke2.h" + +#include "net_load_tests.h" + +using namespace net_load_tests; + +#define EXIT_ON_ERROR(cond) { if (!(cond)) { LOG_PRINT_L0("ERROR: " << #cond); exit(1); } else {} } + +namespace +{ + struct srv_levin_commands_handler : public test_levin_commands_handler + { + srv_levin_commands_handler(test_tcp_server& tcp_server) + : m_tcp_server(tcp_server) + , m_open_close_test_conn_id(boost::uuids::nil_uuid()) + { + } + + virtual void on_connection_new(test_connection_context& context) + { + test_levin_commands_handler::on_connection_new(context); + context.m_closed = false; + + //std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + std::unique_lock<std::mutex> lock(m_open_close_test_mutex); + if (!m_open_close_test_conn_id.is_nil()) + { + EXIT_ON_ERROR(m_open_close_test_helper->handle_new_connection(context.m_connection_id, true)); + } + } + + virtual void on_connection_close(test_connection_context& context) + { + test_levin_commands_handler::on_connection_close(context); + + std::unique_lock<std::mutex> lock(m_open_close_test_mutex); + if (context.m_connection_id == m_open_close_test_conn_id) + { + LOG_PRINT_L0("Stop open/close test"); + m_open_close_test_conn_id = boost::uuids::nil_uuid(); + m_open_close_test_helper.reset(0); + } + } + + CHAIN_LEVIN_INVOKE_MAP2(test_connection_context); + CHAIN_LEVIN_NOTIFY_MAP2(test_connection_context); + + BEGIN_INVOKE_MAP2(srv_levin_commands_handler) + HANDLE_NOTIFY_T2(CMD_CLOSE_ALL_CONNECTIONS, &srv_levin_commands_handler::handle_close_all_connections) + HANDLE_NOTIFY_T2(CMD_SHUTDOWN, &srv_levin_commands_handler::handle_shutdown) + HANDLE_NOTIFY_T2(CMD_SEND_DATA_REQUESTS, &srv_levin_commands_handler::handle_send_data_requests) + HANDLE_INVOKE_T2(CMD_GET_STATISTICS, &srv_levin_commands_handler::handle_get_statistics) + HANDLE_INVOKE_T2(CMD_RESET_STATISTICS, &srv_levin_commands_handler::handle_reset_statistics) + HANDLE_INVOKE_T2(CMD_START_OPEN_CLOSE_TEST, &srv_levin_commands_handler::handle_start_open_close_test) + END_INVOKE_MAP2() + + int handle_close_all_connections(int command, const CMD_CLOSE_ALL_CONNECTIONS::request& req, test_connection_context& context) + { + close_connections(context.m_connection_id); + return 1; + } + + int handle_get_statistics(int command, const CMD_GET_STATISTICS::request&, CMD_GET_STATISTICS::response& rsp, test_connection_context& /*context*/) + { + rsp.opened_connections_count = m_tcp_server.get_config_object().get_connections_count(); + rsp.new_connection_counter = new_connection_counter(); + rsp.close_connection_counter = close_connection_counter(); + LOG_PRINT_L0("Statistics: " << rsp.to_string()); + return 1; + } + + int handle_reset_statistics(int command, const CMD_RESET_STATISTICS::request&, CMD_RESET_STATISTICS::response& /*rsp*/, test_connection_context& /*context*/) + { + m_new_connection_counter.reset(); + m_new_connection_counter.inc(); + m_close_connection_counter.reset(); + return 1; + } + + int handle_start_open_close_test(int command, const CMD_START_OPEN_CLOSE_TEST::request& req, CMD_START_OPEN_CLOSE_TEST::response&, test_connection_context& context) + { + std::unique_lock<std::mutex> lock(m_open_close_test_mutex); + if (0 == m_open_close_test_helper.get()) + { + LOG_PRINT_L0("Start open/close test (" << req.open_request_target << ", " << req.max_opened_conn_count << ")"); + + m_open_close_test_conn_id = context.m_connection_id; + m_open_close_test_helper.reset(new open_close_test_helper(m_tcp_server, req.open_request_target, req.max_opened_conn_count)); + return 1; + } + else + { + return -1; + } + } + + int handle_shutdown(int command, const CMD_SHUTDOWN::request& req, test_connection_context& /*context*/) + { + LOG_PRINT_L0("Got shutdown requst. Shutting down..."); + m_tcp_server.send_stop_signal(); + return 1; + } + + int handle_send_data_requests(int /*command*/, const CMD_SEND_DATA_REQUESTS::request& req, test_connection_context& context) + { + boost::uuids::uuid cmd_conn_id = context.m_connection_id; + m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) { + if (ctx.m_connection_id != cmd_conn_id) + { + CMD_DATA_REQUEST::request req2; + req2.data.resize(req.request_size); + + bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req2, + m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) { + if (code <= 0) + { + LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code); + } + }); + if (!r) + LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST"); + } + return true; + }); + + return 1; + } + + private: + void close_connections(boost::uuids::uuid cmd_conn_id) + { + LOG_PRINT_L0("Closing connections. Number of opened connections: " << m_tcp_server.get_config_object().get_connections_count()); + + size_t count = 0; + bool r = m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) { + if (ctx.m_connection_id != cmd_conn_id) + { + ++count; + if (!ctx.m_closed) + { + ctx.m_closed = true; + m_tcp_server.get_config_object().close(ctx.m_connection_id); + } + else + { + LOG_PRINT_L0(count << " connection already closed"); + } + } + return true; + }); + + if (0 < count) + { + // Perhaps not all connections were closed, try to close it after 7 seconds + boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(m_tcp_server.get_io_service(), boost::posix_time::seconds(7))); + sh_deadline->async_wait([=](const boost::system::error_code& ec) + { + boost::shared_ptr<boost::asio::deadline_timer> t = sh_deadline; // Capture sh_deadline + if (!ec) + { + close_connections(cmd_conn_id); + } + else + { + LOG_PRINT_L0("ERROR: " << ec.message() << ':' << ec.value()); + } + }); + } + } + + private: + test_tcp_server& m_tcp_server; + + boost::uuids::uuid m_open_close_test_conn_id; + std::mutex m_open_close_test_mutex; + std::unique_ptr<open_close_test_helper> m_open_close_test_helper; + }; +} + +int main(int argc, char** argv) +{ + //set up logging options + epee::log_space::get_set_log_detalisation_level(true, LOG_LEVEL_0); + epee::log_space::log_singletone::add_logger(LOGGER_CONSOLE, NULL, NULL); + + size_t thread_count = (std::max)(min_thread_count, std::thread::hardware_concurrency() / 2); + + test_tcp_server tcp_server; + if (!tcp_server.init_server(srv_port, "127.0.0.1")) + return 1; + + srv_levin_commands_handler commands_handler(tcp_server); + tcp_server.get_config_object().m_pcommands_handler = &commands_handler; + tcp_server.get_config_object().m_invoke_timeout = 10000; + //tcp_server.get_config_object().m_max_packet_size = max_packet_size; + + if (!tcp_server.run_server(thread_count, true)) + return 2; + + return 0; +} |