diff options
Diffstat (limited to 'contrib/epee/include/net/levin_protocol_handler_async.h')
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler_async.h | 778 |
1 files changed, 778 insertions, 0 deletions
diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h new file mode 100644 index 000000000..dc4f41146 --- /dev/null +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -0,0 +1,778 @@ +// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the Andrey N. Sabelnikov nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#pragma once +#include <boost/uuid/uuid_generators.hpp> +#include <boost/interprocess/detail/atomic.hpp> +#include <boost/smart_ptr/make_shared.hpp> + +#include <atomic> + +#include "levin_base.h" +#include "misc_language.h" + + +namespace epee +{ +namespace levin +{ + +/************************************************************************/ +/* */ +/************************************************************************/ +template<class t_connection_context> +class async_protocol_handler; + +template<class t_connection_context> +class async_protocol_handler_config +{ + typedef std::map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map; + critical_section m_connects_lock; + connections_map m_connects; + + void add_connection(async_protocol_handler<t_connection_context>* pc); + void del_connection(async_protocol_handler<t_connection_context>* pc); + + async_protocol_handler<t_connection_context>* find_connection(boost::uuids::uuid connection_id) const; + int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph); + + friend class async_protocol_handler<t_connection_context>; + +public: + typedef t_connection_context connection_context; + levin_commands_handler<t_connection_context>* m_pcommands_handler; + boost::uint64_t m_max_packet_size; + boost::uint64_t m_invoke_timeout; + + int invoke(int command, const std::string& in_buff, std::string& buff_out, boost::uuids::uuid connection_id); + template<class callback_t> + int invoke_async(int command, const std::string& in_buff, boost::uuids::uuid connection_id, callback_t cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); + + int notify(int command, const std::string& in_buff, boost::uuids::uuid connection_id); + bool close(boost::uuids::uuid connection_id); + bool update_connection_context(const t_connection_context& contxt); + bool request_callback(boost::uuids::uuid connection_id); + template<class callback_t> + bool foreach_connection(callback_t cb); + size_t get_connections_count(); + + async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE) + {} +}; + + +/************************************************************************/ +/* */ +/************************************************************************/ +template<class t_connection_context = net_utils::connection_context_base> +class async_protocol_handler +{ +public: + typedef t_connection_context connection_context; + typedef async_protocol_handler_config<t_connection_context> config_type; + + enum stream_state + { + stream_state_head, + stream_state_body + }; + + std::atomic<bool> m_deletion_initiated; + std::atomic<bool> m_protocol_released; + volatile uint32_t m_invoke_buf_ready; + + volatile int m_invoke_result_code; + + critical_section m_local_inv_buff_lock; + std::string m_local_inv_buff; + + critical_section m_send_lock; + critical_section m_call_lock; + + volatile uint32_t m_wait_count; + volatile uint32_t m_close_called; + bucket_head2 m_current_head; + net_utils::i_service_endpoint* m_pservice_endpoint; + config_type& m_config; + t_connection_context& m_connection_context; + + std::string m_cache_in_buffer; + stream_state m_state; + + boost::int32_t m_oponent_protocol_ver; + bool m_connection_initialized; + + struct invoke_response_handler_base + { + virtual bool handle(int res, const std::string& buff, connection_context& context)=0; + virtual bool is_timer_started() const=0; + virtual void cancel()=0; + virtual bool cancel_timer()=0; + }; + template <class callback_t> + struct anvoke_handler: invoke_response_handler_base + { + anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command) + :m_cb(cb), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false), + m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command) + { + if(m_con.start_outer_call()) + { + m_timer.expires_from_now(boost::posix_time::milliseconds(timeout)); + m_timer.async_wait([&con, command, cb](const boost::system::error_code& ec) + { + if(ec == boost::asio::error::operation_aborted) + return; + LOG_PRINT_CC(con.get_context_ref(), "Timeout on invoke operation happened, command: " << command, LOG_LEVEL_2); + std::string fake; + cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref()); + con.close(); + con.finish_outer_call(); + }); + m_timer_started = true; + } + } + virtual ~anvoke_handler() + {} + callback_t m_cb; + async_protocol_handler& m_con; + boost::asio::deadline_timer m_timer; + bool m_timer_started; + bool m_cancel_timer_called; + bool m_timer_cancelled; + int m_command; + virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context) + { + if(!cancel_timer()) + return false; + m_cb(res, buff, context); + m_con.finish_outer_call(); + return true; + } + virtual bool is_timer_started() const + { + return m_timer_started; + } + virtual void cancel() + { + if(cancel_timer()) + { + std::string fake; + m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, fake, m_con.get_context_ref()); + m_con.finish_outer_call(); + } + } + virtual bool cancel_timer() + { + if(!m_cancel_timer_called) + { + m_cancel_timer_called = true; + boost::system::error_code ignored_ec; + m_timer_cancelled = 1 == m_timer.cancel(ignored_ec); + } + return m_timer_cancelled; + } + }; + critical_section m_invoke_response_handlers_lock; + std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers; + + template<class callback_t> + bool add_invoke_response_handler(callback_t cb, uint64_t timeout, async_protocol_handler& con, int command) + { + CRITICAL_REGION_LOCAL(m_invoke_response_handlers_lock); + boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command)); + m_invoke_response_handlers.push_back(handler); + return handler->is_timer_started(); + } + template<class callback_t> friend struct anvoke_handler; +public: + async_protocol_handler(net_utils::i_service_endpoint* psnd_hndlr, + config_type& config, + t_connection_context& conn_context): + m_current_head(bucket_head2()), + m_pservice_endpoint(psnd_hndlr), + m_config(config), + m_connection_context(conn_context), + m_state(stream_state_head) + { + m_close_called = 0; + m_deletion_initiated = false; + m_protocol_released = false; + m_wait_count = 0; + m_oponent_protocol_ver = 0; + m_connection_initialized = false; + } + virtual ~async_protocol_handler() + { + m_deletion_initiated = true; + if(m_connection_initialized) + { + m_config.del_connection(this); + } + + for (size_t i = 0; i < 60 * 1000 / 100 && 0 != boost::interprocess::ipcdetail::atomic_read32(&m_wait_count); ++i) + { + misc_utils::sleep_no_w(100); + } + CHECK_AND_ASSERT_MES_NO_RET(0 == boost::interprocess::ipcdetail::atomic_read32(&m_wait_count), "Failed to wait for operation completion. m_wait_count = " << m_wait_count); + + LOG_PRINT_CC(m_connection_context, "~async_protocol_handler()", LOG_LEVEL_4); + } + + bool start_outer_call() + { + LOG_PRINT_CC_L4(m_connection_context, "[levin_protocol] -->> start_outer_call"); + if(!m_pservice_endpoint->add_ref()) + { + LOG_PRINT_CC_RED(m_connection_context, "[levin_protocol] -->> start_outer_call failed", LOG_LEVEL_4); + return false; + } + boost::interprocess::ipcdetail::atomic_inc32(&m_wait_count); + return true; + } + bool finish_outer_call() + { + LOG_PRINT_CC_L4(m_connection_context, "[levin_protocol] <<-- finish_outer_call"); + boost::interprocess::ipcdetail::atomic_dec32(&m_wait_count); + m_pservice_endpoint->release(); + return true; + } + + bool release_protocol() + { + decltype(m_invoke_response_handlers) local_invoke_response_handlers; + CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock); + local_invoke_response_handlers.swap(m_invoke_response_handlers); + m_protocol_released = true; + CRITICAL_REGION_END(); + + // Never call callback inside critical section, that can cause deadlock. Callback can be called when + // invoke_response_handler_base is cancelled + std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) { + pinv_resp_hndlr->cancel(); + }); + + return true; + } + + bool close() + { + boost::interprocess::ipcdetail::atomic_inc32(&m_close_called); + + m_pservice_endpoint->close(); + return true; + } + + void update_connection_context(const connection_context& contxt) + { + m_connection_context = contxt; + } + + void request_callback() + { + misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this)); + + m_pservice_endpoint->request_callback(); + } + + void handle_qued_callback() + { + m_config.m_pcommands_handler->callback(m_connection_context); + } + + virtual bool handle_recv(const void* ptr, size_t cb) + { + if(boost::interprocess::ipcdetail::atomic_read32(&m_close_called)) + return false; //closing connections + + if(!m_config.m_pcommands_handler) + { + LOG_ERROR_CC(m_connection_context, "Commands handler not set!"); + return false; + } + + if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size) + { + LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size + << ", packet received " << m_cache_in_buffer.size() + cb + << ", connection will be closed."); + return false; + } + + m_cache_in_buffer.append((const char*)ptr, cb); + + bool is_continue = true; + while(is_continue) + { + switch(m_state) + { + case stream_state_body: + if(m_cache_in_buffer.size() < m_current_head.m_cb) + { + is_continue = false; + break; + } + { + std::string buff_to_invoke; + if(m_cache_in_buffer.size() == m_current_head.m_cb) + buff_to_invoke.swap(m_cache_in_buffer); + else + { + buff_to_invoke.assign(m_cache_in_buffer, 0, (std::string::size_type)m_current_head.m_cb); + m_cache_in_buffer.erase(0, (std::string::size_type)m_current_head.m_cb); + } + + bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE); + + LOG_PRINT_CC_L4(m_connection_context, "LEVIN_PACKET_RECIEVED. [len=" << m_current_head.m_cb + << ", flags" << m_current_head.m_flags + << ", r?=" << m_current_head.m_have_to_return_data + <<", cmd = " << m_current_head.m_command + << ", v=" << m_current_head.m_protocol_version); + + if(is_response) + {//response to some invoke + + epee::critical_region_t<decltype(m_invoke_response_handlers_lock)> invoke_response_handlers_guard(m_invoke_response_handlers_lock); + if(!m_invoke_response_handlers.empty()) + {//async call scenario + boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front(); + bool timer_cancelled = response_handler->cancel_timer(); + // Don't pop handler, to avoid destroying it + if(timer_cancelled) + m_invoke_response_handlers.pop_front(); + invoke_response_handlers_guard.unlock(); + + if(timer_cancelled) + response_handler->handle(m_current_head.m_command, buff_to_invoke, m_connection_context); + } + else + { + invoke_response_handlers_guard.unlock(); + //use sync call scenario + if(!boost::interprocess::ipcdetail::atomic_read32(&m_wait_count) && !boost::interprocess::ipcdetail::atomic_read32(&m_close_called)) + { + LOG_ERROR_CC(m_connection_context, "no active invoke when response came, wtf?"); + return false; + }else + { + CRITICAL_REGION_BEGIN(m_local_inv_buff_lock); + buff_to_invoke.swap(m_local_inv_buff); + buff_to_invoke.clear(); + m_invoke_result_code = m_current_head.m_return_code; + CRITICAL_REGION_END(); + boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 1); + } + } + }else + { + if(m_current_head.m_have_to_return_data) + { + std::string return_buff; + m_current_head.m_return_code = m_config.m_pcommands_handler->invoke( + m_current_head.m_command, + buff_to_invoke, + return_buff, + m_connection_context); + m_current_head.m_cb = return_buff.size(); + m_current_head.m_have_to_return_data = false; + m_current_head.m_protocol_version = LEVIN_PROTOCOL_VER_1; + m_current_head.m_flags = LEVIN_PACKET_RESPONSE; + std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); + send_buff += return_buff; + CRITICAL_REGION_BEGIN(m_send_lock); + if(!m_pservice_endpoint->do_send(send_buff.data(), send_buff.size())) + return false; + CRITICAL_REGION_END(); + LOG_PRINT_CC_L4(m_connection_context, "LEVIN_PACKET_SENT. [len=" << m_current_head.m_cb + << ", flags" << m_current_head.m_flags + << ", r?=" << m_current_head.m_have_to_return_data + <<", cmd = " << m_current_head.m_command + << ", ver=" << m_current_head.m_protocol_version); + } + else + m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); + } + } + m_state = stream_state_head; + break; + case stream_state_head: + { + if(m_cache_in_buffer.size() < sizeof(bucket_head2)) + { + if(m_cache_in_buffer.size() >= sizeof(boost::uint64_t) && *((boost::uint64_t*)m_cache_in_buffer.data()) != LEVIN_SIGNATURE) + { + LOG_ERROR_CC(m_connection_context, "Signature mismatch, connection will be closed"); + return false; + } + is_continue = false; + break; + } + + bucket_head2* phead = (bucket_head2*)m_cache_in_buffer.data(); + if(LEVIN_SIGNATURE != phead->m_signature) + { + LOG_ERROR_CC(m_connection_context, "Signature mismatch, connection will be closed"); + return false; + } + m_current_head = *phead; + + m_cache_in_buffer.erase(0, sizeof(bucket_head2)); + m_state = stream_state_body; + m_oponent_protocol_ver = m_current_head.m_protocol_version; + if(m_current_head.m_cb > m_config.m_max_packet_size) + { + LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size + << ", packet header received " << m_current_head.m_cb + << ", connection will be closed."); + return false; + } + } + break; + default: + LOG_ERROR_CC(m_connection_context, "Undefined state in levin_server_impl::connection_handler, m_state=" << m_state); + return false; + } + } + + return true; + } + + bool after_init_connection() + { + if (!m_connection_initialized) + { + m_connection_initialized = true; + m_config.add_connection(this); + } + return true; + } + + template<class callback_t> + bool async_invoke(int command, const std::string& in_buff, callback_t cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + { + misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this)); + + if(timeout == LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + timeout = m_config.m_invoke_timeout; + + int err_code = LEVIN_OK; + do + { + if(m_deletion_initiated) + { + err_code = LEVIN_ERROR_CONNECTION_DESTROYED; + break; + } + + CRITICAL_REGION_LOCAL(m_call_lock); + + if(m_deletion_initiated) + { + err_code = LEVIN_ERROR_CONNECTION_DESTROYED; + break; + } + + bucket_head2 head = {0}; + head.m_signature = LEVIN_SIGNATURE; + head.m_cb = in_buff.size(); + head.m_have_to_return_data = true; + + head.m_flags = LEVIN_PACKET_REQUEST; + head.m_command = command; + head.m_protocol_version = LEVIN_PROTOCOL_VER_1; + + boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); + CRITICAL_REGION_BEGIN(m_send_lock); + CRITICAL_REGION_LOCAL1(m_invoke_response_handlers_lock); + if(!m_pservice_endpoint->do_send(&head, sizeof(head))) + { + LOG_ERROR_CC(m_connection_context, "Failed to do_send"); + err_code = LEVIN_ERROR_CONNECTION; + break; + } + + if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size())) + { + LOG_ERROR_CC(m_connection_context, "Failed to do_send"); + err_code = LEVIN_ERROR_CONNECTION; + break; + } + + if(!add_invoke_response_handler(cb, timeout, *this, command)) + { + err_code = LEVIN_ERROR_CONNECTION_DESTROYED; + break; + } + CRITICAL_REGION_END(); + } while (false); + + if (LEVIN_OK != err_code) + { + std::string stub_buff; + // Never call callback inside critical section, that can cause deadlock + cb(err_code, stub_buff, m_connection_context); + return false; + } + + return true; + } + + int invoke(int command, const std::string& in_buff, std::string& buff_out) + { + misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this)); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + CRITICAL_REGION_LOCAL(m_call_lock); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + bucket_head2 head = {0}; + head.m_signature = LEVIN_SIGNATURE; + head.m_cb = in_buff.size(); + head.m_have_to_return_data = true; + + head.m_flags = LEVIN_PACKET_REQUEST; + head.m_command = command; + head.m_protocol_version = LEVIN_PROTOCOL_VER_1; + + boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); + CRITICAL_REGION_BEGIN(m_send_lock); + if(!m_pservice_endpoint->do_send(&head, sizeof(head))) + { + LOG_ERROR_CC(m_connection_context, "Failed to do_send"); + return LEVIN_ERROR_CONNECTION; + } + + if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size())) + { + LOG_ERROR_CC(m_connection_context, "Failed to do_send"); + return LEVIN_ERROR_CONNECTION; + } + CRITICAL_REGION_END(); + + LOG_PRINT_CC_L4(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", f=" << head.m_flags + << ", r?=" << head.m_have_to_return_data + << ", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); + + uint64_t ticks_start = misc_utils::get_tick_count(); + + while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released) + { + if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout) + { + LOG_PRINT_CC_L2(m_connection_context, "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection "); + close(); + return LEVIN_ERROR_CONNECTION_TIMEDOUT; + } + if(!m_pservice_endpoint->call_run_once_service_io()) + return LEVIN_ERROR_CONNECTION_DESTROYED; + } + + if(m_deletion_initiated || m_protocol_released) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + CRITICAL_REGION_BEGIN(m_local_inv_buff_lock); + buff_out.swap(m_local_inv_buff); + m_local_inv_buff.clear(); + CRITICAL_REGION_END(); + + return m_invoke_result_code; + } + + int notify(int command, const std::string& in_buff) + { + misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this)); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + CRITICAL_REGION_LOCAL(m_call_lock); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + bucket_head2 head = {0}; + head.m_signature = LEVIN_SIGNATURE; + head.m_have_to_return_data = false; + head.m_cb = in_buff.size(); + + head.m_command = command; + head.m_protocol_version = LEVIN_PROTOCOL_VER_1; + head.m_flags = LEVIN_PACKET_REQUEST; + CRITICAL_REGION_BEGIN(m_send_lock); + if(!m_pservice_endpoint->do_send(&head, sizeof(head))) + { + LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); + return -1; + } + + if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size())) + { + LOG_ERROR("Failed to do_send()"); + return -1; + } + CRITICAL_REGION_END(); + LOG_PRINT_CC_L4(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb << + ", f=" << head.m_flags << + ", r?=" << head.m_have_to_return_data << + ", cmd = " << head.m_command << + ", ver=" << head.m_protocol_version); + + return 1; + } + //------------------------------------------------------------------------------------------ + boost::uuids::uuid get_connection_id() {return m_connection_context.m_connection_id;} + //------------------------------------------------------------------------------------------ + t_connection_context& get_context_ref() {return m_connection_context;} +}; +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +void async_protocol_handler_config<t_connection_context>::del_connection(async_protocol_handler<t_connection_context>* pconn) +{ + CRITICAL_REGION_BEGIN(m_connects_lock); + m_connects.erase(pconn->get_connection_id()); + CRITICAL_REGION_END(); + m_pcommands_handler->on_connection_close(pconn->m_connection_context); +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +void async_protocol_handler_config<t_connection_context>::add_connection(async_protocol_handler<t_connection_context>* pconn) +{ + CRITICAL_REGION_BEGIN(m_connects_lock); + m_connects[pconn->get_connection_id()] = pconn; + CRITICAL_REGION_END(); + m_pcommands_handler->on_connection_new(pconn->m_connection_context); +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +async_protocol_handler<t_connection_context>* async_protocol_handler_config<t_connection_context>::find_connection(boost::uuids::uuid connection_id) const +{ + auto it = m_connects.find(connection_id); + return it == m_connects.end() ? 0 : it->second; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +int async_protocol_handler_config<t_connection_context>::find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph) +{ + CRITICAL_REGION_LOCAL(m_connects_lock); + aph = find_connection(connection_id); + if(0 == aph) + return LEVIN_ERROR_CONNECTION_NOT_FOUND; + if(!aph->start_outer_call()) + return LEVIN_ERROR_CONNECTION_DESTROYED; + return LEVIN_OK; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +int async_protocol_handler_config<t_connection_context>::invoke(int command, const std::string& in_buff, std::string& buff_out, boost::uuids::uuid connection_id) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> template<class callback_t> +int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const std::string& in_buff, boost::uuids::uuid connection_id, callback_t cb, size_t timeout) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> template<class callback_t> +bool async_protocol_handler_config<t_connection_context>::foreach_connection(callback_t cb) +{ + CRITICAL_REGION_LOCAL(m_connects_lock); + for(auto& c: m_connects) + { + async_protocol_handler<t_connection_context>* aph = c.second; + if(!cb(aph->get_context_ref())) + return false; + } + return true; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +size_t async_protocol_handler_config<t_connection_context>::get_connections_count() +{ + CRITICAL_REGION_LOCAL(m_connects_lock); + return m_connects.size(); +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +int async_protocol_handler_config<t_connection_context>::notify(int command, const std::string& in_buff, boost::uuids::uuid connection_id) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->notify(command, in_buff) : r; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id) +{ + CRITICAL_REGION_LOCAL(m_connects_lock); + async_protocol_handler<t_connection_context>* aph = find_connection(connection_id); + return 0 != aph ? aph->close() : false; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +bool async_protocol_handler_config<t_connection_context>::update_connection_context(const t_connection_context& contxt) +{ + CRITICAL_REGION_LOCAL(m_connects_lock); + async_protocol_handler<t_connection_context>* aph = find_connection(contxt.m_connection_id); + if(0 == aph) + return false; + aph->update_connection_context(contxt); + return true; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> +bool async_protocol_handler_config<t_connection_context>::request_callback(boost::uuids::uuid connection_id) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + if(LEVIN_OK == r) + { + aph->request_callback(); + return true; + } + else + { + return false; + } +} +} +} |