aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net/abstract_tcp_server_cp.inl
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/epee/include/net/abstract_tcp_server_cp.inl')
-rw-r--r--contrib/epee/include/net/abstract_tcp_server_cp.inl605
1 files changed, 605 insertions, 0 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server_cp.inl b/contrib/epee/include/net/abstract_tcp_server_cp.inl
new file mode 100644
index 000000000..5673c50be
--- /dev/null
+++ b/contrib/epee/include/net/abstract_tcp_server_cp.inl
@@ -0,0 +1,605 @@
+// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the Andrey N. Sabelnikov nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+
+#pragma comment(lib, "Ws2_32.lib")
+
+namespace epee
+{
+namespace net_utils
+{
+template<class TProtocol>
+cp_server_impl<TProtocol>::cp_server_impl():
+ m_port(0), m_stop(false),
+ m_worker_thread_counter(0), m_listen_socket(INVALID_SOCKET)
+{
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+cp_server_impl<TProtocol>::~cp_server_impl()
+{
+ deinit_server();
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+bool cp_server_impl<TProtocol>::init_server(int port_no)
+{
+ m_port = port_no;
+
+ WSADATA wsad = {0};
+ int err = ::WSAStartup(MAKEWORD(2,2), &wsad);
+ if ( err != 0 || LOBYTE( wsad.wVersion ) != 2 || HIBYTE( wsad.wVersion ) != 2 )
+ {
+ LOG_ERROR("Could not find a usable WinSock DLL, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ return false;
+ }
+
+ m_initialized = true;
+
+ m_listen_socket = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
+ if(INVALID_SOCKET == m_listen_socket)
+ {
+ err = ::WSAGetLastError();
+ LOG_ERROR("Failed to create socket, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ return false;
+ }
+
+
+ int opt = 1;
+ err = setsockopt (m_listen_socket, SOL_SOCKET,SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(int));
+ if(SOCKET_ERROR == err )
+ {
+ err = ::WSAGetLastError();
+ LOG_PRINT("Failed to setsockopt(SO_REUSEADDR), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_1);
+ deinit_server();
+ return false;
+ }
+
+
+ sockaddr_in adr = {0};
+ adr.sin_family = AF_INET;
+ adr.sin_addr.s_addr = htonl(INADDR_ANY);
+ adr.sin_port = (u_short)htons(m_port);
+
+ //binding
+ err = bind(m_listen_socket, (const sockaddr*)&adr, sizeof(adr ));
+ if(SOCKET_ERROR == err )
+ {
+ err = ::WSAGetLastError();
+ LOG_PRINT("Failed to Bind, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_1);
+ deinit_server();
+ return false;
+ }
+
+
+ m_completion_port = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ if(INVALID_HANDLE_VALUE == m_completion_port)
+ {
+ err = ::WSAGetLastError();
+ LOG_PRINT("Failed to CreateIoCompletionPort, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_1);
+ deinit_server();
+ return false;
+ }
+
+
+ return true;
+}
+//-------------------------------------------------------------
+
+//-------------------------------------------------------------
+static int CALLBACK CPConditionFunc(
+ IN LPWSABUF lpCallerId,
+ IN LPWSABUF lpCallerData,
+ IN OUT LPQOS lpSQOS,
+ IN OUT LPQOS lpGQOS,
+ IN LPWSABUF lpCalleeId,
+ OUT LPWSABUF lpCalleeData,
+ OUT GROUP FAR *g,
+ IN DWORD_PTR dwCallbackData
+ )
+{
+
+ /*cp_server_impl* pthis = (cp_server_impl*)dwCallbackData;
+ if(!pthis)
+ return CF_REJECT;*/
+ /*if(pthis->get_active_connections_num()>=FD_SETSIZE-1)
+ {
+ LOG_PRINT("Maximum connections count overfull.", LOG_LEVEL_2);
+ return CF_REJECT;
+ }*/
+
+ return CF_ACCEPT;
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+size_t cp_server_impl<TProtocol>::get_active_connections_num()
+{
+ return m_connections.size();
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+unsigned CALLBACK cp_server_impl<TProtocol>::worker_thread(void* param)
+{
+ if(!param)
+ return 0;
+
+ cp_server_impl<TProtocol>* pthis = (cp_server_impl<TProtocol>*)param;
+ pthis->worker_thread_member();
+ return 1;
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+bool cp_server_impl<TProtocol>::worker_thread_member()
+{
+ LOG_PRINT("Worker thread STARTED", LOG_LEVEL_1);
+ bool stop_handling = false;
+ while(!stop_handling)
+ {
+ PROFILE_FUNC("[worker_thread]Worker Loop");
+ DWORD bytes_transfered = 0;
+ connection<TProtocol>* pconnection = 0;
+ io_data_base* pio_data = 0;
+
+ {
+ PROFILE_FUNC("[worker_thread]GetQueuedCompletionStatus");
+ BOOL res = ::GetQueuedCompletionStatus (m_completion_port, &bytes_transfered , (PULONG_PTR)&pconnection, (LPOVERLAPPED *)&pio_data, INFINITE);
+ if (res == 0)
+ {
+ // check return code for error
+ int err = GetLastError();
+ LOG_PRINT("GetQueuedCompletionStatus failed with error " << err << " " << log_space::get_win32_err_descr(err), LOG_LEVEL_1);
+
+ if(pio_data)
+ ::InterlockedExchange(&pio_data->m_is_in_use, 0);
+
+
+ continue;
+ }
+ }
+
+ if(pio_data)
+ ::InterlockedExchange(&pio_data->m_is_in_use, 0);
+
+
+
+ if(!bytes_transfered && !pconnection && !pio_data)
+ {
+ //signal to stop
+ break;
+ }
+ if(!pconnection || !pio_data)
+ {
+ LOG_PRINT("BIG FAIL: pconnection or pio_data is empty: pconnection=" << pconnection << " pio_data=" << pio_data, LOG_LEVEL_0);
+ break;
+ }
+
+
+
+ if(::InterlockedCompareExchange(&pconnection->m_connection_shutwoned, 0, 0))
+ {
+ LOG_ERROR("InterlockedCompareExchange(&pconnection->m_connection_shutwoned, 0, 0)");
+ //DebugBreak();
+ }
+
+ if(pio_data->m_op_type == op_type_stop)
+ {
+ if(!pconnection)
+ {
+ LOG_ERROR("op_type=op_type_stop, but pconnection is empty!!!");
+ continue;
+ }
+ shutdown_connection(pconnection);
+ continue;//
+ }
+ else if(pio_data->m_op_type == op_type_send)
+ {
+ continue;
+ //do nothing, just queuing request
+ }else if(pio_data->m_op_type == op_type_recv)
+ {
+ PROFILE_FUNC("[worker_thread]m_tprotocol_handler.handle_recv");
+ if(bytes_transfered)
+ {
+ bool res = pconnection->m_tprotocol_handler.handle_recv(pio_data->Buffer, bytes_transfered);
+ if(!res)
+ pconnection->query_shutdown();
+ }
+ else
+ {
+ pconnection->query_shutdown();
+ continue;
+ }
+
+ }
+
+ //preparing new request,
+
+ {
+ PROFILE_FUNC("[worker_thread]RECV Request small loop");
+ int res = 0;
+ while(true)
+ {
+ LOG_PRINT("Prepearing data for WSARecv....", LOG_LEVEL_3);
+ ZeroMemory(&pio_data->m_overlapped, sizeof(OVERLAPPED));
+ pio_data->DataBuf.len = pio_data->TotalBuffBytes;
+ pio_data->DataBuf.buf = pio_data->Buffer;
+ pio_data->m_op_type = op_type_recv;
+ //calling WSARecv() and go to completion waiting
+ DWORD bytes_recvd = 0;
+ DWORD flags = 0;
+
+ LOG_PRINT("Calling WSARecv....", LOG_LEVEL_3);
+ ::InterlockedExchange(&pio_data->m_is_in_use, 1);
+ res = WSARecv(pconnection->m_sock, &(pio_data->DataBuf), 1, &bytes_recvd , &flags, &(pio_data->m_overlapped), NULL);
+ if(res == SOCKET_ERROR )
+ {
+ int err = ::WSAGetLastError();
+ if(WSA_IO_PENDING == err )
+ {//go pending, ok
+ LOG_PRINT("WSARecv return WSA_IO_PENDING", LOG_LEVEL_3);
+ break;
+ }
+ LOG_ERROR("BIG FAIL: WSARecv error code not correct, res=" << res << " last_err=" << err);
+ ::InterlockedExchange(&pio_data->m_is_in_use, 0);
+ pconnection->query_shutdown();
+ break;
+ }
+ break;
+ /*else if(0 == res)
+ {
+ if(!bytes_recvd)
+ {
+ ::InterlockedExchange(&pio_data->m_is_in_use, 0);
+ LOG_PRINT("WSARecv return 0, bytes_recvd=0, graceful close.", LOG_LEVEL_3);
+ int err = ::WSAGetLastError();
+ //LOG_ERROR("BIG FAIL: WSARecv error code not correct, res=" << res << " last_err=" << err);
+ //pconnection->query_shutdown();
+ break;
+ }else
+ {
+ LOG_PRINT("WSARecv return immediatily 0, bytes_recvd=" << bytes_recvd, LOG_LEVEL_3);
+ //pconnection->m_tprotocol_handler.handle_recv(pio_data->Buffer, bytes_recvd);
+ }
+ }*/
+ }
+ }
+ }
+
+
+ LOG_PRINT("Worker thread STOPED", LOG_LEVEL_1);
+ ::InterlockedDecrement(&m_worker_thread_counter);
+ return true;
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+bool cp_server_impl<TProtocol>::shutdown_connection(connection<TProtocol>* pconn)
+{
+ PROFILE_FUNC("[shutdown_connection]");
+
+ if(!pconn)
+ {
+ LOG_ERROR("Attempt to remove null pptr connection!");
+ return false;
+ }
+ else
+ {
+ LOG_PRINT("Shutting down connection ("<< pconn << ")", LOG_LEVEL_3);
+ }
+ m_connections_lock.lock();
+ connections_container::iterator it = m_connections.find(pconn->m_sock);
+ m_connections_lock.unlock();
+ if(it == m_connections.end())
+ {
+ LOG_ERROR("Failed to find closing socket=" << pconn->m_sock);
+ return false;
+ }
+ SOCKET sock = it->second->m_sock;
+ {
+ PROFILE_FUNC("[shutdown_connection] shutdown, close");
+ ::shutdown(it->second->m_sock, SD_SEND );
+ }
+ size_t close_sock_wait_count = 0;
+ {
+ LOG_PRINT("Entered to 'in_use wait zone'", LOG_LEVEL_3);
+ PROFILE_FUNC("[shutdown_connection] wait for in_use");
+ while(::InterlockedCompareExchange(&it->second->m_precv_data->m_is_in_use, 1, 1))
+ {
+
+ Sleep(100);
+ close_sock_wait_count++;
+ }
+ LOG_PRINT("First step to 'in_use wait zone'", LOG_LEVEL_3);
+
+
+ while(::InterlockedCompareExchange(&it->second->m_psend_data->m_is_in_use, 1, 1))
+ {
+ Sleep(100);
+ close_sock_wait_count++;
+ }
+ LOG_PRINT("Leaved 'in_use wait zone'", LOG_LEVEL_3);
+ }
+
+ ::closesocket(it->second->m_sock);
+
+ ::InterlockedExchange(&it->second->m_connection_shutwoned, 1);
+ m_connections_lock.lock();
+ m_connections.erase(it);
+ m_connections_lock.unlock();
+ LOG_PRINT("Socked " << sock << " closed, wait_count=" << close_sock_wait_count, LOG_LEVEL_2);
+ return true;
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+bool cp_server_impl<TProtocol>::run_server(int threads_count = 0)
+{
+ int err = listen(m_listen_socket, 100);
+ if(SOCKET_ERROR == err )
+ {
+ err = ::WSAGetLastError();
+ LOG_ERROR("Failed to listen, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ return false;
+ }
+
+ if(!threads_count)
+ {
+ SYSTEM_INFO si = {0};
+ ::GetSystemInfo(&si);
+ threads_count = si.dwNumberOfProcessors + 2;
+ }
+ for(int i = 0; i != threads_count; i++)
+ {
+ boost::thread(boost::bind(&cp_server_impl::worker_thread_member, this));
+ //HANDLE h_thread = threads_helper::create_thread(worker_thread, this);
+ InterlockedIncrement(&m_worker_thread_counter);
+ //::CloseHandle(h_thread);
+ }
+
+ LOG_PRINT("Numbers of worker threads started: " << threads_count, LOG_LEVEL_1);
+
+ m_stop = false;
+ while(!m_stop)
+ {
+ PROFILE_FUNC("[run_server] main_loop");
+ TIMEVAL tv = {0};
+ tv.tv_sec = 0;
+ tv.tv_usec = 100;
+ fd_set sock_set;
+ sock_set.fd_count = 1;
+ sock_set.fd_array[0] = m_listen_socket;
+ int select_res = 0;
+ {
+ PROFILE_FUNC("[run_server] select");
+ select_res = select(0, &sock_set, &sock_set, NULL, &tv);
+ }
+
+ if(SOCKET_ERROR == select_res)
+ {
+ err = ::WSAGetLastError();
+ LOG_ERROR("Failed to select, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ return false;
+ }
+ if(!select_res)
+ {
+ on_net_idle();
+ continue;
+ }
+ else
+ {
+ sockaddr_in adr_from = {0};
+ int adr_len = sizeof(adr_from);
+ SOCKET new_sock = INVALID_SOCKET;
+ {
+ PROFILE_FUNC("[run_server] WSAAccept");
+ new_sock = ::WSAAccept(m_listen_socket, (sockaddr *)&adr_from, &adr_len, CPConditionFunc, (DWORD_PTR)this);
+ }
+
+ if(INVALID_SOCKET == new_sock)
+ {
+ if(m_stop)
+ break;
+ int err = ::WSAGetLastError();
+ LOG_PRINT("Failed to WSAAccept, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_2);
+ continue;
+ }
+ LOG_PRINT("Accepted connection (new socket=" << new_sock << ")", LOG_LEVEL_2);
+ {
+ PROFILE_FUNC("[run_server] Add new connection");
+ add_new_connection(new_sock, adr_from.sin_addr.s_addr, adr_from.sin_port);
+ }
+
+ }
+
+ }
+ LOG_PRINT("Closing connections("<< m_connections.size() << ") and waiting...", LOG_LEVEL_2);
+ m_connections_lock.lock();
+ for(connections_container::iterator it = m_connections.begin(); it != m_connections.end(); it++)
+ {
+ ::shutdown(it->second->m_sock, SD_BOTH);
+ ::closesocket(it->second->m_sock);
+ }
+ m_connections_lock.unlock();
+ size_t wait_count = 0;
+ while(m_connections.size() && wait_count < 100)
+ {
+ ::Sleep(100);
+ wait_count++;
+ }
+ LOG_PRINT("Connections closed OK (wait_count=" << wait_count << ")", LOG_LEVEL_2);
+
+
+ LOG_PRINT("Stopping worker threads("<< m_worker_thread_counter << ").", LOG_LEVEL_2);
+ for(int i = 0; i<m_worker_thread_counter; i++)
+ {
+ ::PostQueuedCompletionStatus(m_completion_port, 0, 0, 0);
+ }
+
+ wait_count = 0;
+ while(InterlockedCompareExchange(&m_worker_thread_counter, 0, 0) && wait_count < 100)
+ {
+ Sleep(100);
+ wait_count++;
+ }
+
+ LOG_PRINT("Net Server STOPPED, wait_count = " << wait_count, LOG_LEVEL_1);
+ return true;
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+bool cp_server_impl<TProtocol>::add_new_connection(SOCKET new_sock, long ip_from, int port_from)
+{
+ PROFILE_FUNC("[add_new_connection]");
+
+ LOG_PRINT("Add new connection zone: entering lock", LOG_LEVEL_3);
+ m_connections_lock.lock();
+
+ boost::shared_ptr<connection<TProtocol> > ptr;
+ ptr.reset(new connection<TProtocol>(m_config));
+
+ connection<TProtocol>& conn = *ptr.get();
+ m_connections[new_sock] = ptr;
+ LOG_PRINT("Add new connection zone: leaving lock", LOG_LEVEL_3);
+ m_connections_lock.unlock();
+ conn.init_buffers();
+ conn.m_sock = new_sock;
+ conn.context.m_remote_ip = ip_from;
+ conn.context.m_remote_port = port_from;
+ conn.m_completion_port = m_completion_port;
+ {
+ PROFILE_FUNC("[add_new_connection] CreateIoCompletionPort");
+ ::CreateIoCompletionPort((HANDLE)new_sock, m_completion_port, (ULONG_PTR)&conn, 0);
+ }
+
+ //if(NULL == ::CreateIoCompletionPort((HANDLE)new_sock, m_completion_port, (ULONG_PTR)&conn, 0))
+ //{
+ // int err = ::GetLastError();
+ // LOG_PRINT("Failed to CreateIoCompletionPort(associate socket and completion port), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_2);
+ // return false;
+ //}
+
+ conn.m_tprotocol_handler.after_init_connection();
+ {
+ PROFILE_FUNC("[add_new_connection] starting loop");
+ int res = 0;
+ while(true)//res!=SOCKET_ERROR)
+ {
+ PROFILE_FUNC("[add_new_connection] in loop time");
+ conn.m_precv_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE;
+ ZeroMemory(&conn.m_precv_data->m_overlapped, sizeof(OVERLAPPED));
+ conn.m_precv_data->DataBuf.len = conn.m_precv_data->TotalBuffBytes;
+ conn.m_precv_data->DataBuf.buf = conn.m_precv_data->Buffer;
+ conn.m_precv_data->m_op_type = op_type_recv;
+ InterlockedExchange(&conn.m_precv_data->m_is_in_use, 1);
+ DWORD bytes_recvd = 0;
+ DWORD flags = 0;
+
+ ::InterlockedExchange(&conn.m_precv_data->m_is_in_use, 1);
+ {
+ PROFILE_FUNC("[add_new_connection] ::WSARecv");
+ res = ::WSARecv(conn.m_sock, &(conn.m_precv_data->DataBuf), 1, &bytes_recvd , &flags, &(conn.m_precv_data->m_overlapped), NULL);
+ }
+ if(res == SOCKET_ERROR )
+ {
+ int err = ::WSAGetLastError();
+ if(WSA_IO_PENDING == err )
+ {
+ break;
+ }
+ LOG_ERROR("BIG FAIL: WSARecv error code not correct, res=" << res << " last_err=" << err << " " << log_space::get_win32_err_descr(err));
+ ::InterlockedExchange(&conn.m_precv_data->m_is_in_use, 0);
+ conn.query_shutdown();
+ //shutdown_connection(&conn);
+ break;
+ }
+
+
+ break;
+ /*else if(0 == res)
+ {
+ if(!bytes_recvd)
+ {
+ PROFILE_FUNC("[add_new_connection] shutdown_connection");
+ ::InterlockedExchange(&conn.m_precv_data->m_is_in_use, 0);
+ conn.query_shutdown();
+ //shutdown_connection(&conn);
+ break;
+ }else
+ {
+ PROFILE_FUNC("[add_new_connection] handle_recv");
+ }
+ }*/
+ }
+ }
+
+
+
+ return true;
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+bool cp_server_impl<TProtocol>::deinit_server()
+{
+ if(!m_initialized)
+ return true;
+
+ if(INVALID_SOCKET != m_listen_socket)
+ {
+ shutdown(m_listen_socket, SD_BOTH);
+ int res = closesocket(m_listen_socket);
+ if(SOCKET_ERROR == res)
+ {
+ int err = ::WSAGetLastError();
+ LOG_ERROR("Failed to closesocket(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ }
+ m_listen_socket = INVALID_SOCKET;
+ }
+
+ int res = ::WSACleanup();
+ if(SOCKET_ERROR == res)
+ {
+ int err = ::WSAGetLastError();
+ LOG_ERROR("Failed to WSACleanup(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ }
+ m_initialized = false;
+
+ return true;
+}
+
+//-------------------------------------------------------------
+template<class TProtocol>
+bool cp_server_impl<TProtocol>::send_stop_signal()
+{
+ ::InterlockedExchange(&m_stop, 1);
+ return true;
+}
+//-------------------------------------------------------------
+template<class TProtocol>
+bool cp_server_impl<TProtocol>::is_stop_signal()
+{
+ return m_stop?true:false;
+}
+//-------------------------------------------------------------
+}
+} \ No newline at end of file