aboutsummaryrefslogtreecommitdiff
path: root/tests/net_load_tests
diff options
context:
space:
mode:
authorAntonio Juarez <antonio.maria.juarez@live.com>2014-03-03 22:07:58 +0000
committerAntonio Juarez <antonio.maria.juarez@live.com>2014-03-03 22:07:58 +0000
commit296ae46ed8f8f6e5f986f978febad302e3df231a (patch)
tree1629164454a239308f33c9e12afb22e7f3cd8eeb /tests/net_load_tests
parentchanged name (diff)
downloadmonero-296ae46ed8f8f6e5f986f978febad302e3df231a.tar.xz
moved all stuff to github
Diffstat (limited to '')
-rw-r--r--tests/net_load_tests/clt.cpp603
-rw-r--r--tests/net_load_tests/net_load_tests.h329
-rw-r--r--tests/net_load_tests/srv.cpp211
3 files changed, 1143 insertions, 0 deletions
diff --git a/tests/net_load_tests/clt.cpp b/tests/net_load_tests/clt.cpp
new file mode 100644
index 000000000..45c1d0859
--- /dev/null
+++ b/tests/net_load_tests/clt.cpp
@@ -0,0 +1,603 @@
+// Copyright (c) 2012-2013 The Cryptonote developers
+// Distributed under the MIT/X11 software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include <atomic>
+#include <chrono>
+#include <functional>
+#include <numeric>
+#include <thread>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "include_base_utils.h"
+#include "misc_log_ex.h"
+#include "storages/levin_abstract_invoke2.h"
+
+#include "net_load_tests.h"
+
+using namespace net_load_tests;
+
+namespace
+{
+ const size_t CONNECTION_COUNT = 100000;
+ const size_t CONNECTION_TIMEOUT = 10000;
+ const size_t DEFAULT_OPERATION_TIMEOUT = 30000;
+ const size_t RESERVED_CONN_CNT = 1;
+
+ template<typename t_predicate>
+ bool busy_wait_for(size_t timeout_ms, const t_predicate& predicate, size_t sleep_ms = 10)
+ {
+ for (size_t i = 0; i < timeout_ms / sleep_ms; ++i)
+ {
+ if (predicate())
+ return true;
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
+ }
+ return false;
+ }
+
+ class t_connection_opener_1
+ {
+ public:
+ t_connection_opener_1(test_tcp_server& tcp_server, size_t open_request_target)
+ : m_tcp_server(tcp_server)
+ , m_open_request_target(open_request_target)
+ , m_next_id(0)
+ , m_error_count(0)
+ , m_connections(open_request_target)
+ {
+ for (auto& conn_id : m_connections)
+ conn_id = boost::uuids::nil_uuid();
+ }
+
+ bool open()
+ {
+ size_t id = m_next_id.fetch_add(1, std::memory_order_relaxed);
+ if (m_open_request_target <= id)
+ return false;
+
+ bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
+ if (!ec)
+ {
+ m_connections[id] = context.m_connection_id;
+ }
+ else
+ {
+ m_error_count.fetch_add(1, std::memory_order_relaxed);
+ }
+ });
+
+ if (!r)
+ {
+ m_error_count.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ return true;
+ }
+
+ bool close(size_t id)
+ {
+ if (!m_connections[id].is_nil())
+ {
+ m_tcp_server.get_config_object().close(m_connections[id]);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
+
+ private:
+ test_tcp_server& m_tcp_server;
+ size_t m_open_request_target;
+ std::atomic<size_t> m_next_id;
+ std::atomic<size_t> m_error_count;
+ std::vector<boost::uuids::uuid> m_connections;
+ };
+
+ class t_connection_opener_2
+ {
+ public:
+ t_connection_opener_2(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count)
+ : m_tcp_server(tcp_server)
+ , m_open_request_target(open_request_target)
+ , m_open_request_count(0)
+ , m_error_count(0)
+ , m_open_close_test_helper(tcp_server, open_request_target, max_opened_connection_count)
+ {
+ }
+
+ bool open_and_close()
+ {
+ size_t req_count = m_open_request_count.fetch_add(1, std::memory_order_relaxed);
+ if (m_open_request_target <= req_count)
+ return false;
+
+ bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
+ if (!ec)
+ {
+ m_open_close_test_helper.handle_new_connection(context.m_connection_id);
+ }
+ else
+ {
+ m_error_count.fetch_add(1, std::memory_order_relaxed);
+ }
+ });
+
+ if (!r)
+ {
+ m_error_count.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ return true;
+ }
+
+ void close_remaining_connections()
+ {
+ m_open_close_test_helper.close_remaining_connections();
+ }
+
+ size_t opened_connection_count() const { return m_open_close_test_helper.opened_connection_count(); }
+ size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
+
+ private:
+ test_tcp_server& m_tcp_server;
+ size_t m_open_request_target;
+ std::atomic<size_t> m_open_request_count;
+ std::atomic<size_t> m_error_count;
+ open_close_test_helper m_open_close_test_helper;
+ };
+
+ class net_load_test_clt : public ::testing::Test
+ {
+ protected:
+ virtual void SetUp()
+ {
+ m_thread_count = (std::max)(min_thread_count, std::thread::hardware_concurrency() / 2);
+
+ m_tcp_server.get_config_object().m_pcommands_handler = &m_commands_handler;
+ m_tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
+
+ ASSERT_TRUE(m_tcp_server.init_server(clt_port, "127.0.0.1"));
+ ASSERT_TRUE(m_tcp_server.run_server(m_thread_count, false));
+
+ // Connect to server
+ std::atomic<int> conn_status(0);
+ m_cmd_conn_id = boost::uuids::nil_uuid();
+ ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
+ if (!ec)
+ {
+ m_cmd_conn_id = context.m_connection_id;
+ }
+ else
+ {
+ LOG_ERROR("Connection error: " << ec.message());
+ }
+ conn_status.store(1, std::memory_order_seq_cst);
+ }));
+
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out";
+ ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst));
+ ASSERT_FALSE(m_cmd_conn_id.is_nil());
+
+ conn_status.store(0, std::memory_order_seq_cst);
+ CMD_RESET_STATISTICS::request req;
+ ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_RESET_STATISTICS::response>(m_cmd_conn_id, CMD_RESET_STATISTICS::ID, req,
+ m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) {
+ conn_status.store(code, std::memory_order_seq_cst);
+ }));
+
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "reset statistics timed out";
+ ASSERT_LT(0, conn_status.load(std::memory_order_seq_cst));
+ }
+
+ virtual void TearDown()
+ {
+ m_tcp_server.send_stop_signal();
+ ASSERT_TRUE(m_tcp_server.timed_wait_server_stop(DEFAULT_OPERATION_TIMEOUT));
+ }
+
+ static void TearDownTestCase()
+ {
+ // Stop server
+ test_levin_commands_handler commands_handler;
+ test_tcp_server tcp_server;
+ tcp_server.get_config_object().m_pcommands_handler = &commands_handler;
+ tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
+
+ if (!tcp_server.init_server(clt_port, "127.0.0.1")) return;
+ if (!tcp_server.run_server(2, false)) return;
+
+ // Connect to server and invoke shutdown command
+ std::atomic<int> conn_status(0);
+ boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid();
+ tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
+ cmd_conn_id = context.m_connection_id;
+ conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst);
+ });
+
+ if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return;
+ if (1 != conn_status.load(std::memory_order_seq_cst)) return;
+
+ epee::net_utils::notify_remote_command2(cmd_conn_id, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object());
+
+ busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); });
+ }
+
+ template<typename Func>
+ static auto call_func(size_t /*thread_index*/, const Func& func, int) -> decltype(func())
+ {
+ func();
+ }
+
+ template<typename Func>
+ static auto call_func(size_t thread_index, const Func& func, long) -> decltype(func(thread_index))
+ {
+ func(thread_index);
+ }
+
+ template<typename Func>
+ void parallel_exec(const Func& func)
+ {
+ unit_test::call_counter properly_finished_threads;
+ std::vector<std::thread> threads(m_thread_count);
+ for (size_t i = 0; i < threads.size(); ++i)
+ {
+ threads[i] = std::thread([&, i] {
+ call_func(i, func, 0);
+ properly_finished_threads.inc();
+ });
+ }
+
+ for (auto& th : threads)
+ th.join();
+
+ ASSERT_EQ(properly_finished_threads.get(), m_thread_count);
+ }
+
+ void get_server_statistics(CMD_GET_STATISTICS::response& statistics)
+ {
+ std::atomic<int> req_status(0);
+ CMD_GET_STATISTICS::request req;
+ ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_GET_STATISTICS::response>(m_cmd_conn_id, CMD_GET_STATISTICS::ID, req,
+ m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) {
+ if (0 < code)
+ {
+ statistics = rsp;
+ }
+ else
+ {
+ LOG_ERROR("Get server statistics error: " << code);
+ }
+ req_status.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
+ }));
+
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != req_status.load(std::memory_order_seq_cst); })) << "get_server_statistics timed out";
+ ASSERT_EQ(1, req_status.load(std::memory_order_seq_cst));
+ }
+
+ template <typename t_predicate>
+ bool busy_wait_for_server_statistics(CMD_GET_STATISTICS::response& statistics, const t_predicate& predicate)
+ {
+ for (size_t i = 0; i < 30; ++i)
+ {
+ get_server_statistics(statistics);
+ if (predicate(statistics))
+ {
+ return true;
+ }
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+
+ return false;
+ }
+
+ void ask_for_data_requests(size_t request_size = 0)
+ {
+ CMD_SEND_DATA_REQUESTS::request req;
+ req.request_size = request_size;
+ epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object());
+ }
+
+ protected:
+ test_tcp_server m_tcp_server;
+ test_levin_commands_handler m_commands_handler;
+ size_t m_thread_count;
+ boost::uuids::uuid m_cmd_conn_id;
+ };
+}
+
+TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
+{
+ // Open connections
+ t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
+ parallel_exec([&] {
+ while (connection_opener.open());
+ });
+
+ // Wait for all open requests to complete
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
+ LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
+ " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
+
+ // Check
+ ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
+ ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
+ ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
+
+ // Close connections
+ parallel_exec([&](size_t thread_idx) {
+ for (size_t i = thread_idx; i < CONNECTION_COUNT; i += m_thread_count)
+ {
+ connection_opener.close(i);
+ }
+ });
+
+ // Wait for all opened connections to close
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
+ LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
+ " / " << m_commands_handler.close_connection_counter());
+
+ // Check all connections are closed
+ ASSERT_EQ(m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT, m_commands_handler.close_connection_counter());
+ ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
+
+ // Wait for server to handle all open and close requests
+ CMD_GET_STATISTICS::response srv_stat;
+ busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
+ LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
+
+ // Check server status
+ // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
+ ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
+ ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
+
+ // Request data from server, it causes to close rest connections
+ ask_for_data_requests();
+
+ // Wait for server to close rest connections
+ busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
+ LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
+
+ // Check server status. All connections should be closed
+ ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
+ ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
+}
+
+TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_server)
+{
+ // Open connections
+ t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
+ parallel_exec([&] {
+ while (connection_opener.open());
+ });
+
+ // Wait for all open requests to complete
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
+ LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
+ " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
+
+ // Check
+ ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
+ ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
+ ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
+
+ // Wait for server accepts all connections
+ CMD_GET_STATISTICS::response srv_stat;
+ int last_new_connection_counter = -1;
+ busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
+ if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
+ else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
+ });
+
+ // Close connections
+ CMD_CLOSE_ALL_CONNECTIONS::request req;
+ ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
+
+ // Wait for all opened connections to close
+ busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); });
+ LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
+ " / " << m_commands_handler.close_connection_counter());
+
+ // It's OK, if server didn't close all connections, because it could accept not all our connections
+ ASSERT_LE(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
+ ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
+
+ // Wait for server to handle all open and close requests
+ busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
+ LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
+
+ // Check server status
+ ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
+ ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
+
+ // Close rest connections
+ m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
+ if (ctx.m_connection_id != m_cmd_conn_id)
+ {
+ CMD_DATA_REQUEST::request req;
+ bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
+ m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
+ if (code <= 0)
+ {
+ LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
+ }
+ });
+ if (!r)
+ LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
+ }
+ return true;
+ });
+
+ // Wait for all opened connections to close
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
+ LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
+ " / " << m_commands_handler.close_connection_counter());
+
+ // Check
+ ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
+ ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
+}
+
+TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_client)
+{
+ static const size_t MAX_OPENED_CONN_COUNT = 100;
+
+ // Open/close connections
+ t_connection_opener_2 connection_opener(m_tcp_server, CONNECTION_COUNT, MAX_OPENED_CONN_COUNT);
+ parallel_exec([&] {
+ while (connection_opener.open_and_close());
+ });
+
+ // Wait for all open requests to complete
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
+ LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
+ " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
+
+ // Check
+ ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
+ ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
+
+ // Wait for all close requests to complete
+ EXPECT_TRUE(busy_wait_for(4 * DEFAULT_OPERATION_TIMEOUT, [&](){ return connection_opener.opened_connection_count() <= MAX_OPENED_CONN_COUNT; }));
+ LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
+
+ // Check
+ ASSERT_EQ(MAX_OPENED_CONN_COUNT, connection_opener.opened_connection_count());
+
+ connection_opener.close_remaining_connections();
+
+ // Wait for all close requests to complete
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }));
+ LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
+
+ ASSERT_EQ(m_commands_handler.new_connection_counter(), m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT);
+ ASSERT_EQ(0, connection_opener.opened_connection_count());
+ ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
+
+ // Wait for server to handle all open and close requests
+ CMD_GET_STATISTICS::response srv_stat;
+ busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
+ LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
+
+ // Check server status
+ // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
+ ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
+ ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
+
+ // Request data from server, it causes to close rest connections
+ ask_for_data_requests();
+
+ // Wait for server to close rest connections
+ busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
+ LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
+
+ // Check server status. All connections should be closed
+ ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
+ ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
+}
+
+TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_server)
+{
+ static const size_t MAX_OPENED_CONN_COUNT = 100;
+
+ // Init test
+ std::atomic<int> test_state(0);
+ CMD_START_OPEN_CLOSE_TEST::request req_start;
+ req_start.open_request_target = CONNECTION_COUNT;
+ req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT;
+ ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start,
+ m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) {
+ test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
+ }));
+
+ // Wait for server response
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 1 == test_state.load(std::memory_order_seq_cst); }));
+ ASSERT_EQ(1, test_state.load(std::memory_order_seq_cst));
+
+ // Open connections
+ t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
+ parallel_exec([&] {
+ while (connection_opener.open());
+ });
+
+ // Wait for all open requests to complete
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
+ LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
+ " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
+ LOG_PRINT_L0("actual number of opened connections: " << m_tcp_server.get_config_object().get_connections_count());
+
+ ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
+ ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
+
+ // Wait for server accepts all connections
+ CMD_GET_STATISTICS::response srv_stat;
+ int last_new_connection_counter = -1;
+ busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
+ if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
+ else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
+ });
+
+ // Ask server to close rest connections
+ CMD_CLOSE_ALL_CONNECTIONS::request req;
+ ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
+
+ // Wait for almost all connections to be closed by server
+ busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; });
+
+ // It's OK, if there are opened connections, because server could accept not all our connections
+ ASSERT_LE(m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT, m_commands_handler.new_connection_counter());
+ ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
+
+ // Wait for server to handle all open and close requests
+ busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
+ LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
+
+ // Check server status
+ ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
+ ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
+
+ // Close rest connections
+ m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
+ if (ctx.m_connection_id != m_cmd_conn_id)
+ {
+ CMD_DATA_REQUEST::request req;
+ bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
+ m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
+ if (code <= 0)
+ {
+ LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
+ }
+ });
+ if (!r)
+ LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
+ }
+ return true;
+ });
+
+ // Wait for all opened connections to close
+ EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
+ LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
+ " / " << m_commands_handler.close_connection_counter());
+
+ // Check
+ ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
+ ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
+}
+
+int main(int argc, char** argv)
+{
+ epee::debug::get_set_enable_assert(true, false);
+ //set up logging options
+ epee::log_space::get_set_log_detalisation_level(true, LOG_LEVEL_0);
+ epee::log_space::log_singletone::add_logger(LOGGER_CONSOLE, NULL, NULL);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/tests/net_load_tests/net_load_tests.h b/tests/net_load_tests/net_load_tests.h
new file mode 100644
index 000000000..20da11bf3
--- /dev/null
+++ b/tests/net_load_tests/net_load_tests.h
@@ -0,0 +1,329 @@
+// Copyright (c) 2012-2013 The Cryptonote developers
+// Distributed under the MIT/X11 software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#pragma once
+
+#include <atomic>
+
+#include <boost/asio/io_service.hpp>
+
+#include "include_base_utils.h"
+#include "string_tools.h"
+#include "net/levin_protocol_handler_async.h"
+#include "net/abstract_tcp_server2.h"
+#include "serialization/keyvalue_serialization.h"
+
+#include "../unit_tests/unit_tests_utils.h"
+
+namespace net_load_tests
+{
+ struct test_connection_context : epee::net_utils::connection_context_base
+ {
+ volatile bool m_closed;
+ };
+
+ typedef epee::levin::async_protocol_handler<test_connection_context> test_levin_protocol_handler;
+ typedef epee::levin::async_protocol_handler_config<test_connection_context> test_levin_protocol_handler_config;
+ typedef epee::net_utils::connection<test_levin_protocol_handler> test_connection;
+ typedef epee::net_utils::boosted_tcp_server<test_levin_protocol_handler> test_tcp_server;
+
+ struct test_levin_commands_handler : public epee::levin::levin_commands_handler<test_connection_context>
+ {
+ test_levin_commands_handler()
+ //: m_return_code(LEVIN_OK)
+ //, m_last_command(-1)
+ {
+ }
+
+ virtual int invoke(int command, const std::string& in_buff, std::string& buff_out, test_connection_context& context)
+ {
+ //m_invoke_counter.inc();
+ //std::unique_lock<std::mutex> lock(m_mutex);
+ //m_last_command = command;
+ //m_last_in_buf = in_buff;
+ //buff_out = m_invoke_out_buf;
+ //return m_return_code;
+ return LEVIN_OK;
+ }
+
+ virtual int notify(int command, const std::string& in_buff, test_connection_context& context)
+ {
+ //m_notify_counter.inc();
+ //std::unique_lock<std::mutex> lock(m_mutex);
+ //m_last_command = command;
+ //m_last_in_buf = in_buff;
+ //return m_return_code;
+ return LEVIN_OK;
+ }
+
+ virtual void callback(test_connection_context& context)
+ {
+ //m_callback_counter.inc();
+ //std::cout << "test_levin_commands_handler::callback()" << std::endl;
+ }
+
+ virtual void on_connection_new(test_connection_context& context)
+ {
+ m_new_connection_counter.inc();
+ //std::cout << "test_levin_commands_handler::on_connection_new()" << std::endl;
+ }
+
+ virtual void on_connection_close(test_connection_context& context)
+ {
+ m_close_connection_counter.inc();
+ //std::cout << "test_levin_commands_handler::on_connection_close()" << std::endl;
+ }
+
+ //size_t invoke_counter() const { return m_invoke_counter.get(); }
+ //size_t notify_counter() const { return m_notify_counter.get(); }
+ //size_t callback_counter() const { return m_callback_counter.get(); }
+ size_t new_connection_counter() const { return m_new_connection_counter.get(); }
+ size_t close_connection_counter() const { return m_close_connection_counter.get(); }
+
+ //int return_code() const { return m_return_code; }
+ //void return_code(int v) { m_return_code = v; }
+
+ //const std::string& invoke_out_buf() const { return m_invoke_out_buf; }
+ //void invoke_out_buf(const std::string& v) { m_invoke_out_buf = v; }
+
+ //int last_command() const { return m_last_command; }
+ //const std::string& last_in_buf() const { return m_last_in_buf; }
+
+ protected:
+ //unit_test::call_counter m_invoke_counter;
+ //unit_test::call_counter m_notify_counter;
+ //unit_test::call_counter m_callback_counter;
+ unit_test::call_counter m_new_connection_counter;
+ unit_test::call_counter m_close_connection_counter;
+
+ //std::mutex m_mutex;
+
+ //int m_return_code;
+ //std::string m_invoke_out_buf;
+
+ //int m_last_command;
+ //std::string m_last_in_buf;
+ };
+
+ class open_close_test_helper
+ {
+ public:
+ open_close_test_helper(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count)
+ : m_tcp_server(tcp_server)
+ , m_open_request_target(open_request_target)
+ , m_max_opened_connection_count(max_opened_connection_count)
+ , m_opened_connection_count(0)
+ , m_next_opened_conn_idx(0)
+ , m_next_closed_conn_idx(0)
+ , m_connections(open_request_target)
+ {
+ for (auto& conn_id : m_connections)
+ conn_id = boost::uuids::nil_uuid();
+ }
+
+ bool handle_new_connection(const boost::uuids::uuid& connection_id, bool ignore_close_fails = false)
+ {
+ size_t idx = m_next_opened_conn_idx.fetch_add(1, std::memory_order_relaxed);
+ m_connections[idx] = connection_id;
+
+ size_t prev_connection_count = m_opened_connection_count.fetch_add(1, std::memory_order_relaxed);
+ if (m_max_opened_connection_count <= prev_connection_count)
+ {
+ return close_next_connection(ignore_close_fails);
+ }
+
+ return true;
+ }
+
+ void close_remaining_connections()
+ {
+ while (close_next_connection(false));
+ }
+
+ bool close_next_connection(bool ignore_close_fails)
+ {
+ size_t idx = m_next_closed_conn_idx.fetch_add(1, std::memory_order_relaxed);
+ if (m_next_opened_conn_idx.load(std::memory_order_relaxed) <= idx)
+ {
+ LOG_PRINT_L0("Not enough opened connections");
+ return false;
+ }
+ if (m_connections[idx].is_nil())
+ {
+ LOG_PRINT_L0("Connection isn't opened");
+ return false;
+ }
+ if (!m_tcp_server.get_config_object().close(m_connections[idx]))
+ {
+ LOG_PRINT_L0("Close connection error: " << m_connections[idx]);
+ if (!ignore_close_fails)
+ {
+ return false;
+ }
+ }
+
+ m_connections[idx] = boost::uuids::nil_uuid();
+ m_opened_connection_count.fetch_sub(1, std::memory_order_relaxed);
+ return true;
+ }
+
+ size_t opened_connection_count() const { return m_opened_connection_count.load(std::memory_order_relaxed); }
+
+ private:
+ test_tcp_server& m_tcp_server;
+ size_t m_open_request_target;
+ size_t m_max_opened_connection_count;
+ std::atomic<size_t> m_opened_connection_count;
+ std::atomic<size_t> m_next_opened_conn_idx;
+ std::atomic<size_t> m_next_closed_conn_idx;
+ std::vector<boost::uuids::uuid> m_connections;
+ };
+
+ const unsigned int min_thread_count = 2;
+ const std::string clt_port("36230");
+ const std::string srv_port("36231");
+
+ enum command_ids
+ {
+ cmd_close_all_connections_id = 73564,
+ cmd_start_open_close_test_id,
+ cmd_get_statistics_id,
+ cmd_reset_statistics_id,
+ cmd_shutdown_id,
+ cmd_send_data_requests_id,
+ cmd_data_request_id
+ };
+
+ struct CMD_CLOSE_ALL_CONNECTIONS
+ {
+ const static int ID = cmd_close_all_connections_id;
+
+ struct request
+ {
+ BEGIN_KV_SERIALIZE_MAP()
+ END_KV_SERIALIZE_MAP()
+ };
+ };
+
+ struct CMD_START_OPEN_CLOSE_TEST
+ {
+ const static int ID = cmd_start_open_close_test_id;
+
+ struct request
+ {
+ size_t open_request_target;
+ size_t max_opened_conn_count;
+
+ BEGIN_KV_SERIALIZE_MAP()
+ KV_SERIALIZE(open_request_target)
+ KV_SERIALIZE(max_opened_conn_count)
+ END_KV_SERIALIZE_MAP()
+ };
+
+ struct response
+ {
+ BEGIN_KV_SERIALIZE_MAP()
+ END_KV_SERIALIZE_MAP()
+ };
+ };
+
+ struct CMD_GET_STATISTICS
+ {
+ const static int ID = cmd_get_statistics_id;
+
+ struct request
+ {
+ BEGIN_KV_SERIALIZE_MAP()
+ END_KV_SERIALIZE_MAP()
+ };
+
+ struct response
+ {
+ size_t opened_connections_count;
+ size_t new_connection_counter;
+ size_t close_connection_counter;
+
+ BEGIN_KV_SERIALIZE_MAP()
+ KV_SERIALIZE(opened_connections_count)
+ KV_SERIALIZE(new_connection_counter)
+ KV_SERIALIZE(close_connection_counter)
+ END_KV_SERIALIZE_MAP()
+
+ std::string to_string() const
+ {
+ std::stringstream ss;
+ ss << "opened_connections_count = " << opened_connections_count <<
+ ", new_connection_counter = " << new_connection_counter <<
+ ", close_connection_counter = " << close_connection_counter;
+ return ss.str();
+ }
+ };
+ };
+
+ struct CMD_RESET_STATISTICS
+ {
+ const static int ID = cmd_reset_statistics_id;
+
+ struct request
+ {
+ BEGIN_KV_SERIALIZE_MAP()
+ END_KV_SERIALIZE_MAP()
+ };
+
+ struct response
+ {
+ BEGIN_KV_SERIALIZE_MAP()
+ END_KV_SERIALIZE_MAP()
+ };
+ };
+
+ struct CMD_SHUTDOWN
+ {
+ const static int ID = cmd_shutdown_id;
+
+ struct request
+ {
+ BEGIN_KV_SERIALIZE_MAP()
+ END_KV_SERIALIZE_MAP()
+ };
+ };
+
+ struct CMD_SEND_DATA_REQUESTS
+ {
+ const static int ID = cmd_send_data_requests_id;
+
+ struct request
+ {
+ size_t request_size;
+
+ BEGIN_KV_SERIALIZE_MAP()
+ KV_SERIALIZE(request_size)
+ END_KV_SERIALIZE_MAP()
+ };
+ };
+
+ struct CMD_DATA_REQUEST
+ {
+ const static int ID = cmd_data_request_id;
+
+ struct request
+ {
+ std::string data;
+ size_t response_size;
+
+ BEGIN_KV_SERIALIZE_MAP()
+ KV_SERIALIZE(data)
+ END_KV_SERIALIZE_MAP()
+ };
+
+ struct response
+ {
+ std::string data;
+
+ BEGIN_KV_SERIALIZE_MAP()
+ KV_SERIALIZE(data)
+ END_KV_SERIALIZE_MAP()
+ };
+ };
+}
diff --git a/tests/net_load_tests/srv.cpp b/tests/net_load_tests/srv.cpp
new file mode 100644
index 000000000..52895c9dd
--- /dev/null
+++ b/tests/net_load_tests/srv.cpp
@@ -0,0 +1,211 @@
+// Copyright (c) 2012-2013 The Cryptonote developers
+// Distributed under the MIT/X11 software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include <mutex>
+#include <thread>
+
+#include "include_base_utils.h"
+#include "misc_log_ex.h"
+#include "storages/levin_abstract_invoke2.h"
+
+#include "net_load_tests.h"
+
+using namespace net_load_tests;
+
+#define EXIT_ON_ERROR(cond) { if (!(cond)) { LOG_PRINT_L0("ERROR: " << #cond); exit(1); } else {} }
+
+namespace
+{
+ struct srv_levin_commands_handler : public test_levin_commands_handler
+ {
+ srv_levin_commands_handler(test_tcp_server& tcp_server)
+ : m_tcp_server(tcp_server)
+ , m_open_close_test_conn_id(boost::uuids::nil_uuid())
+ {
+ }
+
+ virtual void on_connection_new(test_connection_context& context)
+ {
+ test_levin_commands_handler::on_connection_new(context);
+ context.m_closed = false;
+
+ //std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+ std::unique_lock<std::mutex> lock(m_open_close_test_mutex);
+ if (!m_open_close_test_conn_id.is_nil())
+ {
+ EXIT_ON_ERROR(m_open_close_test_helper->handle_new_connection(context.m_connection_id, true));
+ }
+ }
+
+ virtual void on_connection_close(test_connection_context& context)
+ {
+ test_levin_commands_handler::on_connection_close(context);
+
+ std::unique_lock<std::mutex> lock(m_open_close_test_mutex);
+ if (context.m_connection_id == m_open_close_test_conn_id)
+ {
+ LOG_PRINT_L0("Stop open/close test");
+ m_open_close_test_conn_id = boost::uuids::nil_uuid();
+ m_open_close_test_helper.reset(0);
+ }
+ }
+
+ CHAIN_LEVIN_INVOKE_MAP2(test_connection_context);
+ CHAIN_LEVIN_NOTIFY_MAP2(test_connection_context);
+
+ BEGIN_INVOKE_MAP2(srv_levin_commands_handler)
+ HANDLE_NOTIFY_T2(CMD_CLOSE_ALL_CONNECTIONS, &srv_levin_commands_handler::handle_close_all_connections)
+ HANDLE_NOTIFY_T2(CMD_SHUTDOWN, &srv_levin_commands_handler::handle_shutdown)
+ HANDLE_NOTIFY_T2(CMD_SEND_DATA_REQUESTS, &srv_levin_commands_handler::handle_send_data_requests)
+ HANDLE_INVOKE_T2(CMD_GET_STATISTICS, &srv_levin_commands_handler::handle_get_statistics)
+ HANDLE_INVOKE_T2(CMD_RESET_STATISTICS, &srv_levin_commands_handler::handle_reset_statistics)
+ HANDLE_INVOKE_T2(CMD_START_OPEN_CLOSE_TEST, &srv_levin_commands_handler::handle_start_open_close_test)
+ END_INVOKE_MAP2()
+
+ int handle_close_all_connections(int command, const CMD_CLOSE_ALL_CONNECTIONS::request& req, test_connection_context& context)
+ {
+ close_connections(context.m_connection_id);
+ return 1;
+ }
+
+ int handle_get_statistics(int command, const CMD_GET_STATISTICS::request&, CMD_GET_STATISTICS::response& rsp, test_connection_context& /*context*/)
+ {
+ rsp.opened_connections_count = m_tcp_server.get_config_object().get_connections_count();
+ rsp.new_connection_counter = new_connection_counter();
+ rsp.close_connection_counter = close_connection_counter();
+ LOG_PRINT_L0("Statistics: " << rsp.to_string());
+ return 1;
+ }
+
+ int handle_reset_statistics(int command, const CMD_RESET_STATISTICS::request&, CMD_RESET_STATISTICS::response& /*rsp*/, test_connection_context& /*context*/)
+ {
+ m_new_connection_counter.reset();
+ m_new_connection_counter.inc();
+ m_close_connection_counter.reset();
+ return 1;
+ }
+
+ int handle_start_open_close_test(int command, const CMD_START_OPEN_CLOSE_TEST::request& req, CMD_START_OPEN_CLOSE_TEST::response&, test_connection_context& context)
+ {
+ std::unique_lock<std::mutex> lock(m_open_close_test_mutex);
+ if (0 == m_open_close_test_helper.get())
+ {
+ LOG_PRINT_L0("Start open/close test (" << req.open_request_target << ", " << req.max_opened_conn_count << ")");
+
+ m_open_close_test_conn_id = context.m_connection_id;
+ m_open_close_test_helper.reset(new open_close_test_helper(m_tcp_server, req.open_request_target, req.max_opened_conn_count));
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+ int handle_shutdown(int command, const CMD_SHUTDOWN::request& req, test_connection_context& /*context*/)
+ {
+ LOG_PRINT_L0("Got shutdown requst. Shutting down...");
+ m_tcp_server.send_stop_signal();
+ return 1;
+ }
+
+ int handle_send_data_requests(int /*command*/, const CMD_SEND_DATA_REQUESTS::request& req, test_connection_context& context)
+ {
+ boost::uuids::uuid cmd_conn_id = context.m_connection_id;
+ m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
+ if (ctx.m_connection_id != cmd_conn_id)
+ {
+ CMD_DATA_REQUEST::request req2;
+ req2.data.resize(req.request_size);
+
+ bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req2,
+ m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
+ if (code <= 0)
+ {
+ LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
+ }
+ });
+ if (!r)
+ LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
+ }
+ return true;
+ });
+
+ return 1;
+ }
+
+ private:
+ void close_connections(boost::uuids::uuid cmd_conn_id)
+ {
+ LOG_PRINT_L0("Closing connections. Number of opened connections: " << m_tcp_server.get_config_object().get_connections_count());
+
+ size_t count = 0;
+ bool r = m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
+ if (ctx.m_connection_id != cmd_conn_id)
+ {
+ ++count;
+ if (!ctx.m_closed)
+ {
+ ctx.m_closed = true;
+ m_tcp_server.get_config_object().close(ctx.m_connection_id);
+ }
+ else
+ {
+ LOG_PRINT_L0(count << " connection already closed");
+ }
+ }
+ return true;
+ });
+
+ if (0 < count)
+ {
+ // Perhaps not all connections were closed, try to close it after 7 seconds
+ boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(m_tcp_server.get_io_service(), boost::posix_time::seconds(7)));
+ sh_deadline->async_wait([=](const boost::system::error_code& ec)
+ {
+ boost::shared_ptr<boost::asio::deadline_timer> t = sh_deadline; // Capture sh_deadline
+ if (!ec)
+ {
+ close_connections(cmd_conn_id);
+ }
+ else
+ {
+ LOG_PRINT_L0("ERROR: " << ec.message() << ':' << ec.value());
+ }
+ });
+ }
+ }
+
+ private:
+ test_tcp_server& m_tcp_server;
+
+ boost::uuids::uuid m_open_close_test_conn_id;
+ std::mutex m_open_close_test_mutex;
+ std::unique_ptr<open_close_test_helper> m_open_close_test_helper;
+ };
+}
+
+int main(int argc, char** argv)
+{
+ //set up logging options
+ epee::log_space::get_set_log_detalisation_level(true, LOG_LEVEL_0);
+ epee::log_space::log_singletone::add_logger(LOGGER_CONSOLE, NULL, NULL);
+
+ size_t thread_count = (std::max)(min_thread_count, std::thread::hardware_concurrency() / 2);
+
+ test_tcp_server tcp_server;
+ if (!tcp_server.init_server(srv_port, "127.0.0.1"))
+ return 1;
+
+ srv_levin_commands_handler commands_handler(tcp_server);
+ tcp_server.get_config_object().m_pcommands_handler = &commands_handler;
+ tcp_server.get_config_object().m_invoke_timeout = 10000;
+ //tcp_server.get_config_object().m_max_packet_size = max_packet_size;
+
+ if (!tcp_server.run_server(thread_count, true))
+ return 2;
+
+ return 0;
+}