aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net/abstract_tcp_server2.inl
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/epee/include/net/abstract_tcp_server2.inl')
-rw-r--r--contrib/epee/include/net/abstract_tcp_server2.inl811
1 files changed, 811 insertions, 0 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl
new file mode 100644
index 000000000..1d6a1662f
--- /dev/null
+++ b/contrib/epee/include/net/abstract_tcp_server2.inl
@@ -0,0 +1,811 @@
+// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the Andrey N. Sabelnikov nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+
+
+#include "net_utils_base.h"
+#include <boost/lambda/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/lambda/lambda.hpp>
+#include <boost/uuid/random_generator.hpp>
+#include <boost/chrono.hpp>
+#include <boost/utility/value_init.hpp>
+#include <boost/asio/deadline_timer.hpp>
+#include "misc_language.h"
+#include "pragma_comp_defs.h"
+
+PRAGMA_WARNING_PUSH
+namespace epee
+{
+namespace net_utils
+{
+ /************************************************************************/
+ /* */
+ /************************************************************************/
+PRAGMA_WARNING_DISABLE_VS(4355)
+
+ template<class t_protocol_handler>
+ connection<t_protocol_handler>::connection(boost::asio::io_service& io_service,
+ typename t_protocol_handler::config_type& config, volatile boost::uint32_t& sock_count, i_connection_filter* &pfilter)
+ : strand_(io_service),
+ socket_(io_service),
+ //context(typename boost::value_initialized<t_connection_context>())
+ m_protocol_handler(this, config, context),
+ m_want_close_connection(0),
+ m_was_shutdown(0),
+ m_ref_sockets_count(sock_count),
+ m_pfilter(pfilter)
+ {
+ boost::interprocess::ipcdetail::atomic_inc32(&m_ref_sockets_count);
+ }
+PRAGMA_WARNING_DISABLE_VS(4355)
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ connection<t_protocol_handler>::~connection()
+ {
+ if(!m_was_shutdown)
+ {
+ LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown.");
+ shutdown();
+ }
+
+ LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed");
+ boost::interprocess::ipcdetail::atomic_dec32(&m_ref_sockets_count);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ boost::asio::ip::tcp::socket& connection<t_protocol_handler>::socket()
+ {
+ return socket_;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ boost::shared_ptr<connection<t_protocol_handler> > connection<t_protocol_handler>::safe_shared_from_this()
+ {
+ try
+ {
+ return connection<t_protocol_handler>::shared_from_this();
+ }
+ catch (const boost::bad_weak_ptr&)
+ {
+ // It happens when the connection is being deleted
+ return boost::shared_ptr<connection<t_protocol_handler> >();
+ }
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::start(bool is_income, bool is_multithreaded)
+ {
+ TRY_ENTRY();
+
+ // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
+ auto self = safe_shared_from_this();
+ if(!self)
+ return false;
+
+ m_is_multithreaded = is_multithreaded;
+
+ boost::system::error_code ec;
+ auto remote_ep = socket_.remote_endpoint(ec);
+ CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get remote endpoint: " << ec.message() << ':' << ec.value());
+
+ auto local_ep = socket_.local_endpoint(ec);
+ CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get local endpoint: " << ec.message() << ':' << ec.value());
+
+ context = boost::value_initialized<t_connection_context>();
+ long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong());
+
+ context.set_details(boost::uuids::random_generator()(), ip_, remote_ep.port(), is_income);
+ LOG_PRINT_L3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) <<
+ " to " << local_ep.address().to_string() << ':' << local_ep.port() <<
+ ", total sockets objects " << m_ref_sockets_count);
+
+ if(m_pfilter && !m_pfilter->is_remote_ip_allowed(context.m_remote_ip))
+ {
+ LOG_PRINT_L2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection");
+ close();
+ return false;
+ }
+
+ m_protocol_handler.after_init_connection();
+
+ socket_.async_read_some(boost::asio::buffer(buffer_),
+ strand_.wrap(
+ boost::bind(&connection<t_protocol_handler>::handle_read, self,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred)));
+
+ return true;
+
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::request_callback()
+ {
+ TRY_ENTRY();
+ LOG_PRINT_L2("[" << print_connection_context_short(context) << "] request_callback");
+ // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
+ auto self = safe_shared_from_this();
+ if(!self)
+ return false;
+
+ strand_.post(boost::bind(&connection<t_protocol_handler>::call_back_starter, self));
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::request_callback()", false);
+ return true;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ boost::asio::io_service& connection<t_protocol_handler>::get_io_service()
+ {
+ return socket_.get_io_service();
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::add_ref()
+ {
+ TRY_ENTRY();
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] add_ref");
+ CRITICAL_REGION_LOCAL(m_self_refs_lock);
+
+ // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
+ auto self = safe_shared_from_this();
+ if(!self)
+ return false;
+ if(m_was_shutdown)
+ return false;
+ m_self_refs.push_back(self);
+ return true;
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::add_ref()", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::release()
+ {
+ TRY_ENTRY();
+ boost::shared_ptr<connection<t_protocol_handler> > back_connection_copy;
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] release");
+ CRITICAL_REGION_BEGIN(m_self_refs_lock);
+ CHECK_AND_ASSERT_MES(m_self_refs.size(), false, "[sock " << socket_.native_handle() << "] m_self_refs empty at connection<t_protocol_handler>::release() call");
+ //erasing from container without additional copy can cause start deleting object, including m_self_refs
+ back_connection_copy = m_self_refs.back();
+ m_self_refs.pop_back();
+ CRITICAL_REGION_END();
+ return true;
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::release()", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void connection<t_protocol_handler>::call_back_starter()
+ {
+ TRY_ENTRY();
+ LOG_PRINT_L2("[" << print_connection_context_short(context) << "] fired_callback");
+ m_protocol_handler.handle_qued_callback();
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::call_back_starter()", void());
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void connection<t_protocol_handler>::handle_read(const boost::system::error_code& e,
+ std::size_t bytes_transferred)
+ {
+ TRY_ENTRY();
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async read calledback.");
+
+ if (!e)
+ {
+ LOG_PRINT("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred, LOG_LEVEL_4);
+ bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred);
+ if(!recv_res)
+ {
+ LOG_PRINT("[sock " << socket_.native_handle() << "] protocol_want_close", LOG_LEVEL_4);
+
+ //some error in protocol, protocol handler ask to close connection
+ boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1);
+ bool do_shutdown = false;
+ CRITICAL_REGION_BEGIN(m_send_que_lock);
+ if(!m_send_que.size())
+ do_shutdown = true;
+ CRITICAL_REGION_END();
+ if(do_shutdown)
+ shutdown();
+ }else
+ {
+ socket_.async_read_some(boost::asio::buffer(buffer_),
+ strand_.wrap(
+ boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(),
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred)));
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "]Async read requested.");
+ }
+ }else
+ {
+ LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value());
+ if(e.value() != 2)
+ {
+ LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value());
+ shutdown();
+ }
+ }
+ // If an error occurs then no new asynchronous operations are started. This
+ // means that all shared_ptr references to the connection object will
+ // disappear and the object will be destroyed automatically after this
+ // handler returns. The connection class's destructor closes the socket.
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_read", void());
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::call_run_once_service_io()
+ {
+ TRY_ENTRY();
+ if(!m_is_multithreaded)
+ {
+ //single thread model, we can wait in blocked call
+ size_t cnt = socket_.get_io_service().run_one();
+ if(!cnt)//service is going to quit
+ return false;
+ }else
+ {
+ //multi thread model, we can't(!) wait in blocked call
+ //so we make non blocking call and releasing CPU by calling sleep(0);
+ //if no handlers were called
+ //TODO: Maybe we need to have have critical section + event + callback to upper protocol to
+ //ask it inside(!) critical region if we still able to go in event wait...
+ size_t cnt = socket_.get_io_service().poll_one();
+ if(!cnt)
+ misc_utils::sleep_no_w(0);
+ }
+
+ return true;
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::call_run_once_service_io", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb)
+ {
+ TRY_ENTRY();
+ // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
+ auto self = safe_shared_from_this();
+ if(!self)
+ return false;
+ if(m_was_shutdown)
+ return false;
+
+ LOG_PRINT("[sock " << socket_.native_handle() << "] SEND " << cb, LOG_LEVEL_4);
+ //some data should be wrote to stream
+ //request complete
+
+ epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock);
+ if(m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT)
+ {
+ send_guard.unlock();
+ LOG_ERROR("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
+ close();
+ return false;
+ }
+
+ m_send_que.resize(m_send_que.size()+1);
+ m_send_que.back().assign((const char*)ptr, cb);
+
+ if(m_send_que.size() > 1)
+ {
+ //active operation should be in progress, nothing to do, just wait last operation callback
+ }else
+ {
+ //no active operation
+ if(m_send_que.size()!=1)
+ {
+ LOG_ERROR("Looks like no active operations, but send que size != 1!!");
+ return false;
+ }
+
+ boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()),
+ //strand_.wrap(
+ boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2)
+ //)
+ );
+
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
+ }
+
+ return true;
+
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::shutdown()
+ {
+ // Initiate graceful connection closure.
+ boost::system::error_code ignored_ec;
+ socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec);
+ m_was_shutdown = true;
+ m_protocol_handler.release_protocol();
+ return true;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::close()
+ {
+ TRY_ENTRY();
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Que Shutdown called.");
+ size_t send_que_size = 0;
+ CRITICAL_REGION_BEGIN(m_send_que_lock);
+ send_que_size = m_send_que.size();
+ CRITICAL_REGION_END();
+ boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1);
+ if(!send_que_size)
+ {
+ shutdown();
+ }
+
+ return true;
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::close", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void connection<t_protocol_handler>::handle_write(const boost::system::error_code& e, size_t cb)
+ {
+ TRY_ENTRY();
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send calledback " << cb);
+
+ if (e)
+ {
+ LOG_PRINT_L0("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value());
+ shutdown();
+ return;
+ }
+
+ bool do_shutdown = false;
+ CRITICAL_REGION_BEGIN(m_send_que_lock);
+ if(m_send_que.empty())
+ {
+ LOG_ERROR("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!");
+ return;
+ }
+
+ m_send_que.pop_front();
+ if(m_send_que.empty())
+ {
+ if(boost::interprocess::ipcdetail::atomic_read32(&m_want_close_connection))
+ {
+ do_shutdown = true;
+ }
+ }else
+ {
+ //have more data to send
+ boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()),
+ //strand_.wrap(
+ boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2));
+ //);
+ }
+ CRITICAL_REGION_END();
+
+ if(do_shutdown)
+ {
+ shutdown();
+ }
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_write", void());
+ }
+ /************************************************************************/
+ /* */
+ /************************************************************************/
+ template<class t_protocol_handler>
+ boosted_tcp_server<t_protocol_handler>::boosted_tcp_server():
+ m_io_service_local_instance(new boost::asio::io_service()),
+ io_service_(*m_io_service_local_instance.get()),
+ acceptor_(io_service_),
+ new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)),
+ m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0)
+ {
+ m_thread_name_prefix = "NET";
+ }
+
+ template<class t_protocol_handler>
+ boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service):
+ io_service_(extarnal_io_service),
+ acceptor_(io_service_),
+ new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)),
+ m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0)
+ {
+ m_thread_name_prefix = "NET";
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ boosted_tcp_server<t_protocol_handler>::~boosted_tcp_server()
+ {
+ this->send_stop_signal();
+ timed_wait_server_stop(10000);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::init_server(uint32_t port, const std::string address)
+ {
+ TRY_ENTRY();
+ m_stop_signal_sent = false;
+ m_port = port;
+ m_address = address;
+ // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
+ boost::asio::ip::tcp::resolver resolver(io_service_);
+ boost::asio::ip::tcp::resolver::query query(address, boost::lexical_cast<std::string>(port));
+ boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
+ acceptor_.open(endpoint.protocol());
+ acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+ acceptor_.bind(endpoint);
+ acceptor_.listen();
+ boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint();
+ m_port = binded_endpoint.port();
+ acceptor_.async_accept(new_connection_->socket(),
+ boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this,
+ boost::asio::placeholders::error));
+
+ return true;
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::init_server", false);
+ }
+ //-----------------------------------------------------------------------------
+PUSH_WARNINGS
+DISABLE_GCC_WARNING(maybe-uninitialized)
+ template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::init_server(const std::string port, const std::string& address)
+ {
+ uint32_t p = 0;
+
+ if (port.size() && !string_tools::get_xtype_from_string(p, port)) {
+ return false;
+ LOG_ERROR("Failed to convert port no = port");
+ }
+ return this->init_server(p, address);
+ }
+POP_WARNINGS
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::worker_thread()
+ {
+ TRY_ENTRY();
+ uint32_t local_thr_index = boost::interprocess::ipcdetail::atomic_inc32(&m_thread_index);
+ std::string thread_name = std::string("[") + m_thread_name_prefix;
+ thread_name += boost::to_string(local_thr_index) + "]";
+ log_space::log_singletone::set_thread_log_prefix(thread_name);
+ while(!m_stop_signal_sent)
+ {
+ try
+ {
+ io_service_.run();
+ }
+ catch(const std::exception& ex)
+ {
+ LOG_ERROR("Exception at server worker thread, what=" << ex.what());
+ }
+ catch(...)
+ {
+ LOG_ERROR("Exception at server worker thread, unknown execption");
+ }
+ }
+ LOG_PRINT_L4("Worker thread finished");
+ return true;
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::set_threads_prefix(const std::string& prefix_name)
+ {
+ m_thread_name_prefix = prefix_name;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::set_connection_filter(i_connection_filter* pfilter)
+ {
+ m_pfilter = pfilter;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::run_server(size_t threads_count, bool wait)
+ {
+ TRY_ENTRY();
+ m_threads_count = threads_count;
+ m_main_thread_id = boost::this_thread::get_id();
+ log_space::log_singletone::set_thread_log_prefix("[SRV_MAIN]");
+ while(!m_stop_signal_sent)
+ {
+
+ // Create a pool of threads to run all of the io_services.
+ CRITICAL_REGION_BEGIN(m_threads_lock);
+ for (std::size_t i = 0; i < threads_count; ++i)
+ {
+ boost::shared_ptr<boost::thread> thread(new boost::thread(
+ boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this)));
+ m_threads.push_back(thread);
+ }
+ CRITICAL_REGION_END();
+ // Wait for all threads in the pool to exit.
+ if(wait)
+ {
+ for (std::size_t i = 0; i < m_threads.size(); ++i)
+ m_threads[i]->join();
+ m_threads.clear();
+
+ }else
+ {
+ return true;
+ }
+
+ if(wait && !m_stop_signal_sent)
+ {
+ //some problems with the listening socket ?..
+ LOG_PRINT_L0("Net service stopped without stop request, restarting...");
+ if(!this->init_server(m_port, m_address))
+ {
+ LOG_PRINT_L0("Reiniting service failed, exit.");
+ return false;
+ }else
+ {
+ LOG_PRINT_L0("Reiniting OK.");
+ }
+ }
+ }
+ return true;
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::run_server", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::is_thread_worker()
+ {
+ TRY_ENTRY();
+ CRITICAL_REGION_LOCAL(m_threads_lock);
+ BOOST_FOREACH(boost::shared_ptr<boost::thread>& thp, m_threads)
+ {
+ if(thp->get_id() == boost::this_thread::get_id())
+ return true;
+ }
+ if(m_threads_count == 1 && boost::this_thread::get_id() == m_main_thread_id)
+ return true;
+ return false;
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::is_thread_worker", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::timed_wait_server_stop(boost::uint64_t wait_mseconds)
+ {
+ TRY_ENTRY();
+ boost::chrono::milliseconds ms(wait_mseconds);
+ for (std::size_t i = 0; i < m_threads.size(); ++i)
+ {
+ if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms))
+ {
+ LOG_PRINT_L0("Interrupting thread " << m_threads[i]->native_handle());
+ m_threads[i]->interrupt();
+ }
+ }
+ return true;
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::timed_wait_server_stop", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
+ {
+ m_stop_signal_sent = true;
+ TRY_ENTRY();
+ io_service_.stop();
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::send_stop_signal()", void());
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::is_stop_signal_sent()
+ {
+ return m_stop_signal_sent;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::handle_accept(const boost::system::error_code& e)
+ {
+ TRY_ENTRY();
+ if (!e)
+ {
+ connection_ptr conn(std::move(new_connection_));
+
+ new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter));
+ acceptor_.async_accept(new_connection_->socket(),
+ boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this,
+ boost::asio::placeholders::error));
+
+ bool r = conn->start(true, 1 < m_threads_count);
+ if (!r)
+ LOG_ERROR("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count);
+ }else
+ {
+ LOG_ERROR("Some problems at accept: " << e.message() << ", connections_count = " << m_sockets_count);
+ }
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::handle_accept", void());
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::connect(const std::string& adr, const std::string& port, boost::uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip)
+ {
+ TRY_ENTRY();
+
+ connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) );
+ boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
+
+ //////////////////////////////////////////////////////////////////////////
+ boost::asio::ip::tcp::resolver resolver(io_service_);
+ boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port);
+ boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
+ boost::asio::ip::tcp::resolver::iterator end;
+ if(iterator == end)
+ {
+ LOG_ERROR("Failed to resolve " << adr);
+ return false;
+ }
+ //////////////////////////////////////////////////////////////////////////
+
+
+ //boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port);
+ boost::asio::ip::tcp::endpoint remote_endpoint(*iterator);
+
+ sock_.open(remote_endpoint.protocol());
+ if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
+ {
+ boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(adr.c_str()), 0);
+ sock_.bind(local_endpoint);
+ }
+
+ /*
+ NOTICE: be careful to make sync connection from event handler: in case if all threads suddenly do sync connect, there will be no thread to dispatch events from io service.
+ */
+
+ boost::system::error_code ec = boost::asio::error::would_block;
+
+ //have another free thread(s), work in wait mode, without event handling
+ struct local_async_context
+ {
+ boost::system::error_code ec;
+ boost::mutex connect_mut;
+ boost::condition_variable cond;
+ };
+
+ boost::shared_ptr<local_async_context> local_shared_context(new local_async_context());
+ local_shared_context->ec = boost::asio::error::would_block;
+ boost::unique_lock<boost::mutex> lock(local_shared_context->connect_mut);
+ auto connect_callback = [](boost::system::error_code ec_, boost::shared_ptr<local_async_context> shared_context)
+ {
+ shared_context->connect_mut.lock(); shared_context->ec = ec_; shared_context->connect_mut.unlock(); shared_context->cond.notify_one();
+ };
+
+ sock_.async_connect(remote_endpoint, boost::bind<void>(connect_callback, _1, local_shared_context));
+ while(local_shared_context->ec == boost::asio::error::would_block)
+ {
+ bool r = local_shared_context->cond.timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(conn_timeout));
+ if(local_shared_context->ec == boost::asio::error::would_block && !r)
+ {
+ //timeout
+ sock_.close();
+ LOG_PRINT_L3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")");
+ return false;
+ }
+ }
+ ec = local_shared_context->ec;
+
+ if (ec || !sock_.is_open())
+ {
+ LOG_PRINT("Some problems at connect, message: " << ec.message(), LOG_LEVEL_3);
+ return false;
+ }
+
+ LOG_PRINT_L3("Connected success to " << adr << ':' << port);
+
+ bool r = new_connection_l->start(false, 1 < m_threads_count);
+ if (r)
+ {
+ new_connection_l->get_context(conn_context);
+ //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter));
+ }
+ else
+ {
+ LOG_ERROR("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count);
+ }
+
+ return r;
+
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler> template<class t_callback>
+ bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, boost::uint32_t conn_timeout, t_callback cb, const std::string& bind_ip)
+ {
+ TRY_ENTRY();
+ connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) );
+ boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
+
+ //////////////////////////////////////////////////////////////////////////
+ boost::asio::ip::tcp::resolver resolver(io_service_);
+ boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port);
+ boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
+ boost::asio::ip::tcp::resolver::iterator end;
+ if(iterator == end)
+ {
+ LOG_ERROR("Failed to resolve " << adr);
+ return false;
+ }
+ //////////////////////////////////////////////////////////////////////////
+ boost::asio::ip::tcp::endpoint remote_endpoint(*iterator);
+
+ sock_.open(remote_endpoint.protocol());
+ if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
+ {
+ boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(adr.c_str()), 0);
+ sock_.bind(local_endpoint);
+ }
+
+ boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(io_service_));
+ //start deadline
+ sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout));
+ sh_deadline->async_wait([=](const boost::system::error_code& error)
+ {
+ if(error != boost::asio::error::operation_aborted)
+ {
+ LOG_PRINT_L3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")");
+ new_connection_l->socket().close();
+ }
+ });
+ //start async connect
+ sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_)
+ {
+ t_connection_context conn_context = AUTO_VAL_INIT(conn_context);
+ boost::system::error_code ignored_ec;
+ boost::asio::ip::tcp::socket::endpoint_type lep = new_connection_l->socket().local_endpoint(ignored_ec);
+ if(!ec_)
+ {//success
+ if(!sh_deadline->cancel())
+ {
+ cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation
+ }else
+ {
+ LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port <<
+ " from " << lep.address().to_string() << ':' << lep.port());
+ bool r = new_connection_l->start(false, 1 < m_threads_count);
+ if (r)
+ {
+ new_connection_l->get_context(conn_context);
+ cb(conn_context, ec_);
+ }
+ else
+ {
+ LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port);
+ cb(conn_context, boost::asio::error::fault);
+ }
+ }
+ }else
+ {
+ LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port <<
+ " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value());
+ cb(conn_context, ec_);
+ }
+ });
+ return true;
+ CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
+ }
+}
+}
+PRAGMA_WARNING_POP