From f59b1d5fb55cc100d646a66a67900df0e15b3ecf Mon Sep 17 00:00:00 2001 From: anon Date: Fri, 12 Mar 2021 11:59:05 +0000 Subject: async_protocol_handler_config: add deadlock demo --- tests/unit_tests/epee_boosted_tcp_server.cpp | 110 +++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/tests/unit_tests/epee_boosted_tcp_server.cpp b/tests/unit_tests/epee_boosted_tcp_server.cpp index 3e8c889e2..b8481bf0b 100644 --- a/tests/unit_tests/epee_boosted_tcp_server.cpp +++ b/tests/unit_tests/epee_boosted_tcp_server.cpp @@ -170,6 +170,7 @@ TEST(test_epee_connection, test_lifetime) using connection_ptr = boost::shared_ptr; using shared_state_t = typename connection_t::shared_state; using shared_state_ptr = std::shared_ptr; + using shared_states_t = std::vector; using tag_t = boost::uuids::uuid; using tags_t = std::vector; using io_context_t = boost::asio::io_service; @@ -344,6 +345,115 @@ TEST(test_epee_connection, test_lifetime) shared_state->del_out_connections(1); ASSERT_TRUE(shared_state->get_connections_count() == 0); } + + shared_states_t shared_states; + while (shared_states.size() < 2) { + shared_states.emplace_back(std::make_shared()); + shared_states.back()->set_handler(new command_handler_t(ZERO_DELAY, + [&shared_states]{ + for (auto &s: shared_states) { + auto success = s->foreach_connection([](context_t&){ + return true; + }); + ASSERT_TRUE(success); + } + } + ), + &command_handler_t::destroy + ); + } + workers_t workers; + + for (auto &s: shared_states) { + workers.emplace_back([&io_context, &s, &endpoint]{ + for (auto i = 0; i < N * N; ++i) { + connection_ptr conn(new connection_t(io_context, s, {}, {})); + conn->socket().connect(endpoint); + conn->start({}, {}); + io_context.post([conn]{ + conn->cancel(); + }); + conn.reset(); + s->del_out_connections(1); + while (s->sock_count); + } + }); + } + for (;workers.size(); workers.pop_back()) + workers.back().join(); + + for (auto &s: shared_states) { + workers.emplace_back([&io_context, &s, &endpoint]{ + for (auto i = 0; i < N * N; ++i) { + connection_ptr conn(new connection_t(io_context, s, {}, {})); + conn->socket().connect(endpoint); + conn->start({}, {}); + conn->cancel(); + while (conn.use_count() > 1); + s->foreach_connection([&io_context, &s, &endpoint, &conn](context_t& context){ + conn.reset(new connection_t(io_context, s, {}, {})); + conn->socket().connect(endpoint); + conn->start({}, {}); + conn->cancel(); + while (conn.use_count() > 1); + conn.reset(); + return true; + }); + while (s->sock_count); + } + }); + } + for (;workers.size(); workers.pop_back()) + workers.back().join(); + + for (auto &s: shared_states) { + workers.emplace_back([&io_context, &s, &endpoint]{ + for (auto i = 0; i < N; ++i) { + connection_ptr conn(new connection_t(io_context, s, {}, {})); + conn->socket().connect(endpoint); + conn->start({}, {}); + context_t context; + conn->get_context(context); + auto tag = context.m_connection_id; + conn->cancel(); + while (conn.use_count() > 1); + s->for_connection(tag, [&io_context, &s, &endpoint, &conn](context_t& context){ + conn.reset(new connection_t(io_context, s, {}, {})); + conn->socket().connect(endpoint); + conn->start({}, {}); + conn->cancel(); + while (conn.use_count() > 1); + conn.reset(); + return true; + }); + while (s->sock_count); + } + }); + } + for (;workers.size(); workers.pop_back()) + workers.back().join(); + + for (auto &s: shared_states) { + workers.emplace_back([&io_context, &s, &endpoint]{ + for (auto i = 0; i < N; ++i) { + connection_ptr conn(new connection_t(io_context, s, {}, {})); + conn->socket().connect(endpoint); + conn->start({}, {}); + context_t context; + conn->get_context(context); + auto tag = context.m_connection_id; + io_context.post([conn]{ + conn->cancel(); + }); + conn.reset(); + s->close(tag); + while (s->sock_count); + } + }); + } + for (;workers.size(); workers.pop_back()) + workers.back().join(); + }); for (auto& w: workers) { -- cgit v1.2.3 From d5b78c08b6550a3f99d50ce8bb873b6da5f39eee Mon Sep 17 00:00:00 2001 From: anon Date: Fri, 12 Mar 2021 11:59:05 +0000 Subject: async_protocol_handler_config: fix deadlock --- .../include/net/levin_protocol_handler_async.h | 52 ++++++++++------------ 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 762537e3f..f2bf174cb 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -800,36 +800,32 @@ void async_protocol_handler_config::del_connection(async_p template void async_protocol_handler_config::delete_connections(size_t count, bool incoming) { - std::vector connections; + std::vector connections; + + auto scope_exit_handler = misc_utils::create_scope_leave_handler([&connections]{ + for (auto &aph: connections) + aph->finish_outer_call(); + }); + CRITICAL_REGION_BEGIN(m_connects_lock); for (auto& c: m_connects) { if (c.second->m_connection_context.m_is_income == incoming) - connections.push_back(c.first); + if (c.second->start_outer_call()) + connections.push_back(c.second); } // close random connections from the provided set // TODO or better just keep removing random elements (performance) unsigned seed = std::chrono::system_clock::now().time_since_epoch().count(); shuffle(connections.begin(), connections.end(), std::default_random_engine(seed)); - while (count > 0 && connections.size() > 0) - { - try - { - auto i = connections.end() - 1; - async_protocol_handler *conn = m_connects.at(*i); - m_connects.erase(*i); - conn->close(); - connections.erase(i); - } - catch (const std::out_of_range &e) - { - MWARNING("Connection not found in m_connects, continuing"); - } - --count; - } + for (size_t i = 0; i < connections.size() && i < count; ++i) + m_connects.erase(connections[i]->get_connection_id()); CRITICAL_REGION_END(); + + for (size_t i = 0; i < connections.size() && i < count; ++i) + connections[i]->close(); } //------------------------------------------------------------------------------------------ template @@ -891,18 +887,19 @@ int async_protocol_handler_config::invoke_async(int comman template template bool async_protocol_handler_config::foreach_connection(const callback_t &cb) { - CRITICAL_REGION_LOCAL(m_connects_lock); std::vector conn; - conn.reserve(m_connects.size()); auto scope_exit_handler = misc_utils::create_scope_leave_handler([&conn]{ for (auto &aph: conn) aph->finish_outer_call(); }); + CRITICAL_REGION_BEGIN(m_connects_lock); + conn.reserve(m_connects.size()); for (auto &e: m_connects) if (e.second->start_outer_call()) conn.push_back(e.second); + CRITICAL_REGION_END() for (auto &aph: conn) if (!cb(aph->get_context_ref())) @@ -914,11 +911,8 @@ bool async_protocol_handler_config::foreach_connection(con template template bool async_protocol_handler_config::for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb) { - CRITICAL_REGION_LOCAL(m_connects_lock); - async_protocol_handler* aph = find_connection(connection_id); - if (!aph) - return false; - if (!aph->start_outer_call()) + async_protocol_handler* aph = nullptr; + if (find_and_lock_connection(connection_id, aph) != LEVIN_OK) return false; auto scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&async_protocol_handler::finish_outer_call, aph)); @@ -984,12 +978,14 @@ int async_protocol_handler_config::send(byte_slice message template bool async_protocol_handler_config::close(boost::uuids::uuid connection_id) { - CRITICAL_REGION_LOCAL(m_connects_lock); - async_protocol_handler* aph = find_connection(connection_id); - if (!aph) + async_protocol_handler* aph = nullptr; + if (find_and_lock_connection(connection_id, aph) != LEVIN_OK) return false; + auto scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, aph)); if (!aph->close()) return false; + CRITICAL_REGION_LOCAL(m_connects_lock); m_connects.erase(connection_id); return true; } -- cgit v1.2.3