aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net/levin_client_async.h
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/epee/include/net/levin_client_async.h')
-rw-r--r--contrib/epee/include/net/levin_client_async.h577
1 files changed, 577 insertions, 0 deletions
diff --git a/contrib/epee/include/net/levin_client_async.h b/contrib/epee/include/net/levin_client_async.h
new file mode 100644
index 000000000..b02fa7ee7
--- /dev/null
+++ b/contrib/epee/include/net/levin_client_async.h
@@ -0,0 +1,577 @@
+// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the Andrey N. Sabelnikov nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+
+#pragma once
+
+#include ""
+#include "net_helper.h"
+#include "levin_base.h"
+
+
+namespace epee
+{
+namespace levin
+{
+
+ /************************************************************************
+ * levin_client_async - probably it is not really fast implementation,
+ * each handler thread could make up to 30 ms latency.
+ * But, handling events in reader thread will cause dead locks in
+ * case of recursive call (call invoke() to the same connection
+ * on reader thread on remote invoke() handler)
+ ***********************************************************************/
+
+
+ class levin_client_async
+ {
+ levin_commands_handler* m_pcommands_handler;
+ volatile boost::uint32_t m_is_stop;
+ volatile boost::uint32_t m_threads_count;
+ ::critical_section m_send_lock;
+
+ std::string m_local_invoke_buff;
+ ::critical_section m_local_invoke_buff_lock;
+ volatile int m_invoke_res;
+
+ volatile boost::uint32_t m_invoke_data_ready;
+ volatile boost::uint32_t m_invoke_is_active;
+
+ boost::mutex m_invoke_event;
+ boost::condition_variable m_invoke_cond;
+ size_t m_timeout;
+
+ ::critical_section m_recieved_packets_lock;
+ struct packet_entry
+ {
+ bucket_head m_hd;
+ std::string m_body;
+ boost::uint32_t m_connection_index;
+ };
+ std::list<packet_entry> m_recieved_packets;
+ /*
+ m_current_connection_index needed when some connection was broken and reconnected - in this
+ case we could have some received packets in que, which shoud not be handled
+ */
+ volatile boost::uint32_t m_current_connection_index;
+ ::critical_section m_invoke_lock;
+ ::critical_section m_reciev_packet_lock;
+ ::critical_section m_connection_lock;
+ net_utils::blocked_mode_client m_transport;
+ public:
+ levin_client_async():m_pcommands_handler(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0)
+ {}
+ levin_client_async(const levin_client_async& /*v*/):m_pcommands_handler(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0)
+ {}
+ ~levin_client_async()
+ {
+ boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1);
+ disconnect();
+
+
+ while(boost::interprocess::ipcdetail::atomic_read32(&m_threads_count))
+ ::Sleep(100);
+ }
+
+ void set_handler(levin_commands_handler* phandler)
+ {
+ m_pcommands_handler = phandler;
+ }
+
+ bool connect(boost::uint32_t ip, boost::uint32_t port, boost::uint32_t timeout)
+ {
+ loop_call_guard();
+ critical_region cr(m_connection_lock);
+
+ m_timeout = timeout;
+ bool res = false;
+ CRITICAL_REGION_BEGIN(m_reciev_packet_lock);
+ CRITICAL_REGION_BEGIN(m_send_lock);
+ res = levin_client_impl::connect(ip, port, timeout);
+ boost::interprocess::ipcdetail::atomic_inc32(&m_current_connection_index);
+ CRITICAL_REGION_END();
+ CRITICAL_REGION_END();
+ if(res && !boost::interprocess::ipcdetail::atomic_read32(&m_threads_count) )
+ {
+ //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 0);//m_is_stop = false;
+ boost::thread( boost::bind(&levin_duplex_client::reciever_thread, this) );
+ boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) );
+ boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) );
+ }
+
+ return res;
+ }
+ bool is_connected()
+ {
+ loop_call_guard();
+ critical_region cr(m_cs);
+ return levin_client_impl::is_connected();
+ }
+
+ inline
+ bool check_connection()
+ {
+ loop_call_guard();
+ critical_region cr(m_cs);
+
+ if(!is_connected())
+ {
+ if( !reconnect() )
+ {
+ LOG_ERROR("Reconnect Failed. Failed to invoke() becouse not connected!");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ //------------------------------------------------------------------------------
+ inline
+ bool recv_n(SOCKET s, char* pbuff, size_t cb)
+ {
+ while(cb)
+ {
+ int res = ::recv(m_socket, pbuff, (int)cb, 0);
+
+ if(SOCKET_ERROR == res)
+ {
+ if(!m_connected)
+ return false;
+
+ int err = ::WSAGetLastError();
+ LOG_ERROR("Failed to recv(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ disconnect();
+ //reconnect();
+ return false;
+ }else if(res == 0)
+ {
+ disconnect();
+ //reconnect();
+ return false;
+ }
+ LOG_PRINT_L4("[" << m_socket <<"] RECV " << res);
+ cb -= res;
+ pbuff += res;
+ }
+
+ return true;
+ }
+
+ //------------------------------------------------------------------------------
+ inline
+ bool recv_n(SOCKET s, std::string& buff)
+ {
+ size_t cb_remain = buff.size();
+ char* m_current_ptr = (char*)buff.data();
+ return recv_n(s, m_current_ptr, cb_remain);
+ }
+
+ bool disconnect()
+ {
+ //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1);//m_is_stop = true;
+ loop_call_guard();
+ critical_region cr(m_cs);
+ levin_client_impl::disconnect();
+
+ CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
+ m_local_invoke_buff.clear();
+ m_invoke_res = LEVIN_ERROR_CONNECTION_DESTROYED;
+ CRITICAL_REGION_END();
+ boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1); //m_invoke_data_ready = true;
+ m_invoke_cond.notify_all();
+ return true;
+ }
+
+ void loop_call_guard()
+ {
+
+ }
+
+ void on_leave_invoke()
+ {
+ boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 0);
+ }
+
+ int invoke(const GUID& target, int command, const std::string& in_buff, std::string& buff_out)
+ {
+
+ critical_region cr_invoke(m_invoke_lock);
+
+ boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 1);
+ boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 0);
+ misc_utils::destr_ptr hdlr = misc_utils::add_exit_scope_handler(boost::bind(&levin_duplex_client::on_leave_invoke, this));
+
+ loop_call_guard();
+
+ if(!check_connection())
+ return LEVIN_ERROR_CONNECTION_DESTROYED;
+
+
+ bucket_head head = {0};
+ head.m_signature = LEVIN_SIGNATURE;
+ head.m_cb = in_buff.size();
+ head.m_have_to_return_data = true;
+ head.m_id = target;
+#ifdef TRACE_LEVIN_PACKETS_BY_GUIDS
+ ::UuidCreate(&head.m_id);
+#endif
+ head.m_command = command;
+ head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
+ head.m_flags = LEVIN_PACKET_REQUEST;
+ LOG_PRINT("[" << m_socket <<"] Sending invoke data", LOG_LEVEL_4);
+
+ CRITICAL_REGION_BEGIN(m_send_lock);
+ LOG_PRINT_L4("[" << m_socket <<"] SEND " << sizeof(head));
+ int res = ::send(m_socket, (const char*)&head, sizeof(head), 0);
+ if(SOCKET_ERROR == res)
+ {
+ int err = ::WSAGetLastError();
+ LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ disconnect();
+ return LEVIN_ERROR_CONNECTION_DESTROYED;
+ }
+ LOG_PRINT_L4("[" << m_socket <<"] SEND " << (int)in_buff.size());
+ res = ::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
+ if(SOCKET_ERROR == res)
+ {
+ int err = ::WSAGetLastError();
+ LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ disconnect();
+ return LEVIN_ERROR_CONNECTION_DESTROYED;
+ }
+ CRITICAL_REGION_END();
+ LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
+
+ //hard coded timeout in 10 minutes for maximum invoke period. if it happens, it could mean only some real troubles.
+ boost::system_time timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
+ size_t timeout_count = 0;
+ boost::unique_lock<boost::mutex> lock(m_invoke_event);
+
+ while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_data_ready))
+ {
+ if(!m_invoke_cond.timed_wait(lock, timeout))
+ {
+ if(timeout_count < 10)
+ {
+ //workaround to avoid freezing at timed_wait called after notify_all.
+ timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
+ ++timeout_count;
+ continue;
+ }else if(timeout_count == 10)
+ {
+ //workaround to avoid freezing at timed_wait called after notify_all.
+ timeout = boost::get_system_time()+ boost::posix_time::minutes(10);
+ ++timeout_count;
+ continue;
+ }else
+ {
+ LOG_PRINT("[" << m_socket <<"] Timeout on waiting invoke result. ", LOG_LEVEL_0);
+ //disconnect();
+ return LEVIN_ERROR_CONNECTION_TIMEDOUT;
+ }
+ }
+ }
+
+
+ CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
+ buff_out.swap(m_local_invoke_buff);
+ m_local_invoke_buff.clear();
+ CRITICAL_REGION_END();
+ return m_invoke_res;
+ }
+
+ int notify(const GUID& target, int command, const std::string& in_buff)
+ {
+ if(!check_connection())
+ return LEVIN_ERROR_CONNECTION_DESTROYED;
+
+ bucket_head head = {0};
+ head.m_signature = LEVIN_SIGNATURE;
+ head.m_cb = in_buff.size();
+ head.m_have_to_return_data = false;
+ head.m_id = target;
+#ifdef TRACE_LEVIN_PACKETS_BY_GUIDS
+ ::UuidCreate(&head.m_id);
+#endif
+ head.m_command = command;
+ head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
+ head.m_flags = LEVIN_PACKET_REQUEST;
+ CRITICAL_REGION_BEGIN(m_send_lock);
+ LOG_PRINT_L4("[" << m_socket <<"] SEND " << sizeof(head));
+ int res = ::send(m_socket, (const char*)&head, sizeof(head), 0);
+ if(SOCKET_ERROR == res)
+ {
+ int err = ::WSAGetLastError();
+ LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ disconnect();
+ return LEVIN_ERROR_CONNECTION_DESTROYED;
+ }
+ LOG_PRINT_L4("[" << m_socket <<"] SEND " << (int)in_buff.size());
+ res = ::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
+ if(SOCKET_ERROR == res)
+ {
+ int err = ::WSAGetLastError();
+ LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
+ disconnect();
+ return LEVIN_ERROR_CONNECTION_DESTROYED;
+ }
+ CRITICAL_REGION_END();
+ LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
+
+ return 1;
+ }
+
+
+ private:
+ bool have_some_data(SOCKET sock, int interval = 1)
+ {
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(sock, &fds);
+
+ fd_set fdse;
+ FD_ZERO(&fdse);
+ FD_SET(sock, &fdse);
+
+
+ timeval tv;
+ tv.tv_sec = interval;
+ tv.tv_usec = 0;
+
+ int sel_res = select(0, &fds, 0, &fdse, &tv);
+ if(0 == sel_res)
+ return false;
+ else if(sel_res == SOCKET_ERROR)
+ {
+ if(m_is_stop)
+ return false;
+ int err_code = ::WSAGetLastError();
+ LOG_ERROR("Filed to call select, err code = " << err_code);
+ disconnect();
+ }else
+ {
+ if(fds.fd_array[0])
+ {//some read operations was performed
+ return true;
+ }else if(fdse.fd_array[0])
+ {//some error was at the socket
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ bool reciev_and_process_incoming_data()
+ {
+ bucket_head head = {0};
+ boost::uint32_t conn_index = 0;
+ bool is_request = false;
+ std::string local_buff;
+ CRITICAL_REGION_BEGIN(m_reciev_packet_lock);//to protect from socket reconnect between head and body
+
+ if(!recv_n(m_socket, (char*)&head, sizeof(head)))
+ {
+ if(m_is_stop)
+ return false;
+ LOG_ERROR("Failed to recv_n");
+ return false;
+ }
+
+ conn_index = boost::interprocess::ipcdetail::atomic_read32(&m_current_connection_index);
+
+ if(head.m_signature!=LEVIN_SIGNATURE)
+ {
+ LOG_ERROR("Signature missmatch in response");
+ return false;
+ }
+
+ is_request = (head.m_protocol_version == LEVIN_PROTOCOL_VER_1 && head.m_flags&LEVIN_PACKET_REQUEST);
+
+
+ local_buff.resize((size_t)head.m_cb);
+ if(!recv_n(m_socket, local_buff))
+ {
+ if(m_is_stop)
+ return false;
+ LOG_ERROR("Filed to reciev");
+ return false;
+ }
+ CRITICAL_REGION_END();
+
+ LOG_PRINT_L4("LEVIN_PACKET_RECIEVED. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
+
+ if(is_request)
+ {
+ CRITICAL_REGION_BEGIN(m_recieved_packets_lock);
+ m_recieved_packets.resize(m_recieved_packets.size() + 1);
+ m_recieved_packets.back().m_hd = head;
+ m_recieved_packets.back().m_body.swap(local_buff);
+ m_recieved_packets.back().m_connection_index = conn_index;
+ CRITICAL_REGION_END();
+ /*
+
+ */
+ }else
+ {//this is some response
+
+ CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
+ m_local_invoke_buff.swap(local_buff);
+ m_invoke_res = head.m_return_code;
+ CRITICAL_REGION_END();
+ boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1); //m_invoke_data_ready = true;
+ m_invoke_cond.notify_all();
+
+ }
+ return true;
+ }
+
+ bool reciever_thread()
+ {
+ LOG_PRINT_L3("[" << m_socket <<"] Socket reciever thread started.[m_threads_count=" << m_threads_count << "]");
+ log_space::log_singletone::set_thread_log_prefix("RECIEVER_WORKER");
+ boost::interprocess::ipcdetail::atomic_inc32(&m_threads_count);
+
+ while(!m_is_stop)
+ {
+ if(!m_connected)
+ {
+ Sleep(100);
+ continue;
+ }
+
+ if(have_some_data(m_socket, 1))
+ {
+ if(!reciev_and_process_incoming_data())
+ {
+ if(m_is_stop)
+ {
+ break;//boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
+ //return true;
+ }
+ LOG_ERROR("Failed to reciev_and_process_incoming_data. shutting down");
+ //boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
+ //disconnect_no_wait();
+ //break;
+ }
+ }
+ }
+
+ boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
+ LOG_PRINT_L3("[" << m_socket <<"] Socket reciever thread stopped.[m_threads_count=" << m_threads_count << "]");
+ return true;
+ }
+
+ bool process_recieved_packet(bucket_head& head, const std::string& local_buff, boost::uint32_t conn_index)
+ {
+
+ net_utils::connection_context_base conn_context;
+ conn_context.m_remote_ip = m_ip;
+ conn_context.m_remote_port = m_port;
+ if(head.m_have_to_return_data)
+ {
+ std::string return_buff;
+ if(m_pcommands_handler)
+ head.m_return_code = m_pcommands_handler->invoke(head.m_id, head.m_command, local_buff, return_buff, conn_context);
+ else
+ head.m_return_code = LEVIN_ERROR_CONNECTION_HANDLER_NOT_DEFINED;
+
+
+
+ head.m_cb = return_buff.size();
+ head.m_have_to_return_data = false;
+ head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
+ head.m_flags = LEVIN_PACKET_RESPONSE;
+
+ std::string send_buff((const char*)&head, sizeof(head));
+ send_buff += return_buff;
+ CRITICAL_REGION_BEGIN(m_send_lock);
+ if(conn_index != boost::interprocess::ipcdetail::atomic_read32(&m_current_connection_index))
+ {//there was reconnect, send response back is not allowed
+ return true;
+ }
+ int res = ::send(m_socket, (const char*)send_buff.data(), send_buff.size(), 0);
+ if(res == SOCKET_ERROR)
+ {
+ int err_code = ::WSAGetLastError();
+ LOG_ERROR("Failed to send, err = " << err_code);
+ return false;
+ }
+ CRITICAL_REGION_END();
+ LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
+
+ }
+ else
+ {
+ if(m_pcommands_handler)
+ m_pcommands_handler->notify(head.m_id, head.m_command, local_buff, conn_context);
+ }
+
+ return true;
+ }
+
+ bool handler_thread()
+ {
+ LOG_PRINT_L3("[" << m_socket <<"] Socket handler thread started.[m_threads_count=" << m_threads_count << "]");
+ log_space::log_singletone::set_thread_log_prefix("HANDLER_WORKER");
+ boost::interprocess::ipcdetail::atomic_inc32(&m_threads_count);
+
+ while(!m_is_stop)
+ {
+ bool have_some_work = false;
+ std::string local_buff;
+ bucket_head bh = {0};
+ boost::uint32_t conn_index = 0;
+
+ CRITICAL_REGION_BEGIN(m_recieved_packets_lock);
+ if(m_recieved_packets.size())
+ {
+ bh = m_recieved_packets.begin()->m_hd;
+ conn_index = m_recieved_packets.begin()->m_connection_index;
+ local_buff.swap(m_recieved_packets.begin()->m_body);
+ have_some_work = true;
+ m_recieved_packets.pop_front();
+ }
+ CRITICAL_REGION_END();
+
+ if(have_some_work)
+ {
+ process_recieved_packet(bh, local_buff, conn_index);
+ }else
+ {
+ //Idle when no work
+ Sleep(30);
+ }
+ }
+
+ boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
+ LOG_PRINT_L3("[" << m_socket <<"] Socket handler thread stopped.[m_threads_count=" << m_threads_count << "]");
+ return true;
+ }
+ };
+
+}
+} \ No newline at end of file