diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/rpc/bootstrap_daemon.cpp | 95 | ||||
-rw-r--r-- | src/rpc/bootstrap_daemon.h | 67 | ||||
-rw-r--r-- | src/rpc/core_rpc_server.cpp | 139 | ||||
-rw-r--r-- | src/rpc/core_rpc_server.h | 7 | ||||
-rw-r--r-- | src/rpc/core_rpc_server_commands_defs.h | 60 | ||||
-rw-r--r-- | src/rpc/zmq_server.cpp | 136 | ||||
-rw-r--r-- | src/rpc/zmq_server.h | 20 |
8 files changed, 362 insertions, 166 deletions
diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 06577d37e..116e7f568 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -26,10 +26,13 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +include_directories(SYSTEM ${ZMQ_INCLUDE_PATH}) + set(rpc_base_sources rpc_args.cpp) set(rpc_sources + bootstrap_daemon.cpp core_rpc_server.cpp rpc_handler.cpp instanciations) @@ -53,6 +56,7 @@ set(daemon_rpc_server_headers) set(rpc_daemon_private_headers + bootstrap_daemon.h core_rpc_server.h core_rpc_server_commands_defs.h core_rpc_server_error_codes.h) diff --git a/src/rpc/bootstrap_daemon.cpp b/src/rpc/bootstrap_daemon.cpp new file mode 100644 index 000000000..c97b2c95a --- /dev/null +++ b/src/rpc/bootstrap_daemon.cpp @@ -0,0 +1,95 @@ +#include "bootstrap_daemon.h" + +#include <stdexcept> + +#include "crypto/crypto.h" +#include "cryptonote_core/cryptonote_core.h" +#include "misc_log_ex.h" + +#undef MONERO_DEFAULT_LOG_CATEGORY +#define MONERO_DEFAULT_LOG_CATEGORY "daemon.rpc.bootstrap_daemon" + +namespace cryptonote +{ + + bootstrap_daemon::bootstrap_daemon(std::function<boost::optional<std::string>()> get_next_public_node) + : m_get_next_public_node(get_next_public_node) + { + } + + bootstrap_daemon::bootstrap_daemon(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials) + : bootstrap_daemon(nullptr) + { + if (!set_server(address, credentials)) + { + throw std::runtime_error("invalid bootstrap daemon address or credentials"); + } + } + + std::string bootstrap_daemon::address() const noexcept + { + const auto& host = m_http_client.get_host(); + if (host.empty()) + { + return std::string(); + } + return host + ":" + m_http_client.get_port(); + } + + boost::optional<uint64_t> bootstrap_daemon::get_height() + { + cryptonote::COMMAND_RPC_GET_HEIGHT::request req; + cryptonote::COMMAND_RPC_GET_HEIGHT::response res; + + if (!invoke_http_json("/getheight", req, res)) + { + return boost::none; + } + + if (res.status != CORE_RPC_STATUS_OK) + { + return boost::none; + } + + return res.height; + } + + bool bootstrap_daemon::handle_result(bool success) + { + if (!success && m_get_next_public_node) + { + m_http_client.disconnect(); + } + + return success; + } + + bool bootstrap_daemon::set_server(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials /* = boost::none */) + { + if (!m_http_client.set_server(address, credentials)) + { + MERROR("Failed to set bootstrap daemon address " << address); + return false; + } + + MINFO("Changed bootstrap daemon address to " << address); + return true; + } + + + bool bootstrap_daemon::switch_server_if_needed() + { + if (!m_get_next_public_node || m_http_client.is_connected()) + { + return true; + } + + const boost::optional<std::string> address = m_get_next_public_node(); + if (address) { + return set_server(*address); + } + + return false; + } + +} diff --git a/src/rpc/bootstrap_daemon.h b/src/rpc/bootstrap_daemon.h new file mode 100644 index 000000000..6276b1b21 --- /dev/null +++ b/src/rpc/bootstrap_daemon.h @@ -0,0 +1,67 @@ +#pragma once + +#include <functional> +#include <vector> + +#include <boost/optional/optional.hpp> +#include <boost/utility/string_ref.hpp> + +#include "net/http_client.h" +#include "storages/http_abstract_invoke.h" + +namespace cryptonote +{ + + class bootstrap_daemon + { + public: + bootstrap_daemon(std::function<boost::optional<std::string>()> get_next_public_node); + bootstrap_daemon(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials); + + std::string address() const noexcept; + boost::optional<uint64_t> get_height(); + bool handle_result(bool success); + + template <class t_request, class t_response> + bool invoke_http_json(const boost::string_ref uri, const t_request &out_struct, t_response &result_struct) + { + if (!switch_server_if_needed()) + { + return false; + } + + return handle_result(epee::net_utils::invoke_http_json(uri, out_struct, result_struct, m_http_client)); + } + + template <class t_request, class t_response> + bool invoke_http_bin(const boost::string_ref uri, const t_request &out_struct, t_response &result_struct) + { + if (!switch_server_if_needed()) + { + return false; + } + + return handle_result(epee::net_utils::invoke_http_bin(uri, out_struct, result_struct, m_http_client)); + } + + template <class t_request, class t_response> + bool invoke_http_json_rpc(const boost::string_ref command_name, const t_request &out_struct, t_response &result_struct) + { + if (!switch_server_if_needed()) + { + return false; + } + + return handle_result(epee::net_utils::invoke_http_json_rpc("/json_rpc", std::string(command_name.begin(), command_name.end()), out_struct, result_struct, m_http_client)); + } + + private: + bool set_server(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials = boost::none); + bool switch_server_if_needed(); + + private: + epee::net_utils::http::http_simple_client m_http_client; + std::function<boost::optional<std::string>()> m_get_next_public_node; + }; + +} diff --git a/src/rpc/core_rpc_server.cpp b/src/rpc/core_rpc_server.cpp index 1fc4b816f..66af4a364 100644 --- a/src/rpc/core_rpc_server.cpp +++ b/src/rpc/core_rpc_server.cpp @@ -103,6 +103,7 @@ namespace cryptonote ) : m_core(cr) , m_p2p(p2p) + , m_was_bootstrap_ever_used(false) {} //------------------------------------------------------------------------------------------------------------------------------ bool core_rpc_server::set_bootstrap_daemon(const std::string &address, const std::string &username_password) @@ -116,20 +117,59 @@ namespace cryptonote return set_bootstrap_daemon(address, credentials); } //------------------------------------------------------------------------------------------------------------------------------ + boost::optional<std::string> core_rpc_server::get_random_public_node() + { + COMMAND_RPC_GET_PUBLIC_NODES::request request; + COMMAND_RPC_GET_PUBLIC_NODES::response response; + + request.gray = true; + request.white = true; + if (!on_get_public_nodes(request, response) || response.status != CORE_RPC_STATUS_OK) + { + return boost::none; + } + + const auto get_random_node_address = [](const std::vector<public_node>& public_nodes) -> std::string { + const auto& random_node = public_nodes[crypto::rand_idx(public_nodes.size())]; + const auto address = random_node.host + ":" + std::to_string(random_node.rpc_port); + return address; + }; + + if (!response.white.empty()) + { + return get_random_node_address(response.white); + } + + MDEBUG("No white public node found, checking gray peers"); + + if (!response.gray.empty()) + { + return get_random_node_address(response.gray); + } + + MERROR("Failed to find any suitable public node"); + + return boost::none; + } + //------------------------------------------------------------------------------------------------------------------------------ bool core_rpc_server::set_bootstrap_daemon(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials) { boost::unique_lock<boost::shared_mutex> lock(m_bootstrap_daemon_mutex); - if (!address.empty()) + if (address.empty()) { - if (!m_http_client.set_server(address, credentials, epee::net_utils::ssl_support_t::e_ssl_support_autodetect)) - { - return false; - } + m_bootstrap_daemon.reset(nullptr); + } + else if (address == "auto") + { + m_bootstrap_daemon.reset(new bootstrap_daemon([this]{ return get_random_public_node(); })); + } + else + { + m_bootstrap_daemon.reset(new bootstrap_daemon(address, credentials)); } - m_bootstrap_daemon_address = address; - m_should_use_bootstrap_daemon = !m_bootstrap_daemon_address.empty(); + m_should_use_bootstrap_daemon = m_bootstrap_daemon.get() != nullptr; return true; } @@ -220,7 +260,10 @@ namespace cryptonote { { boost::shared_lock<boost::shared_mutex> lock(m_bootstrap_daemon_mutex); - res.bootstrap_daemon_address = m_bootstrap_daemon_address; + if (m_bootstrap_daemon.get() != nullptr) + { + res.bootstrap_daemon_address = m_bootstrap_daemon->address(); + } } crypto::hash top_hash; m_core.get_blockchain_top(res.height_without_bootstrap, top_hash); @@ -269,7 +312,10 @@ namespace cryptonote else { boost::shared_lock<boost::shared_mutex> lock(m_bootstrap_daemon_mutex); - res.bootstrap_daemon_address = m_bootstrap_daemon_address; + if (m_bootstrap_daemon.get() != nullptr) + { + res.bootstrap_daemon_address = m_bootstrap_daemon->address(); + } res.was_bootstrap_ever_used = m_was_bootstrap_ever_used; } res.database_size = m_core.get_blockchain_storage().get_db().get_database_size(); @@ -1004,7 +1050,8 @@ namespace cryptonote res.block_reward = lMiner.get_block_reward(); } const account_public_address& lMiningAdr = lMiner.get_mining_address(); - res.address = get_account_address_as_str(nettype(), false, lMiningAdr); + if (lMiner.is_mining() || lMiner.get_is_background_mining_enabled()) + res.address = get_account_address_as_str(nettype(), false, lMiningAdr); const uint8_t major_version = m_core.get_blockchain_storage().get_current_hard_fork_version(); const unsigned variant = major_version >= 7 ? major_version - 6 : 0; switch (variant) @@ -1013,6 +1060,7 @@ namespace cryptonote case 1: res.pow_algorithm = "CNv1 (Cryptonight variant 1)"; break; case 2: case 3: res.pow_algorithm = "CNv2 (Cryptonight variant 2)"; break; case 4: case 5: res.pow_algorithm = "CNv4 (Cryptonight variant 4)"; break; + case 6: res.pow_algorithm = "RandomX"; break; default: res.pow_algorithm = "I'm not sure actually"; break; } if (res.is_background_mining_enabled) @@ -1393,6 +1441,18 @@ namespace cryptonote LOG_ERROR("Failed to create block template"); return false; } + if (b.major_version >= RX_BLOCK_VERSION) + { + uint64_t seed_height, next_height; + crypto::hash seed_hash; + crypto::rx_seedheights(res.height, &seed_height, &next_height); + seed_hash = m_core.get_block_id_by_height(seed_height); + res.seed_hash = string_tools::pod_to_hex(seed_hash); + if (next_height != seed_height) { + seed_hash = m_core.get_block_id_by_height(next_height); + res.next_seed_hash = string_tools::pod_to_hex(seed_hash); + } + } store_difficulty(wdiff, res.difficulty, res.wide_difficulty, res.difficulty_top64); blobdata block_blob = t_serializable_object_to_blob(b); crypto::public_key tx_pub_key = cryptonote::get_tx_pub_key_from_extra(b.miner_tx); @@ -1509,7 +1569,7 @@ namespace cryptonote template_req.reserve_size = 1; template_req.wallet_address = req.wallet_address; template_req.prev_block = req.prev_block; - submit_req.push_back(boost::value_initialized<std::string>()); + submit_req.push_back(std::string{}); res.height = m_core.get_blockchain_storage().get_current_blockchain_height(); for(size_t i = 0; i < req.amount_of_blocks; i++) @@ -1535,7 +1595,7 @@ namespace cryptonote return false; } b.nonce = req.starting_nonce; - miner::find_nonce_for_given_block(b, template_res.difficulty, template_res.height); + miner::find_nonce_for_given_block(&(m_core.get_blockchain_storage()), b, template_res.difficulty, template_res.height); submit_req.front() = string_tools::buff_to_hex_nodelimer(block_to_blob(b)); r = on_submitblock(submit_req, submit_res, error_resp, ctx); @@ -1580,7 +1640,7 @@ namespace cryptonote response.reward = get_block_reward(blk); response.block_size = response.block_weight = m_core.get_blockchain_storage().get_db().get_block_weight(height); response.num_txes = blk.tx_hashes.size(); - response.pow_hash = fill_pow_hash ? string_tools::pod_to_hex(get_block_longhash(blk, height)) : ""; + response.pow_hash = fill_pow_hash ? string_tools::pod_to_hex(get_block_longhash(&(m_core.get_blockchain_storage()), blk, height, 0)) : ""; response.long_term_weight = m_core.get_blockchain_storage().get_db().get_block_long_term_weight(height); response.miner_tx_hash = string_tools::pod_to_hex(cryptonote::get_transaction_hash(blk.miner_tx)); return true; @@ -1593,8 +1653,10 @@ namespace cryptonote boost::upgrade_lock<boost::shared_mutex> upgrade_lock(m_bootstrap_daemon_mutex); - if (m_bootstrap_daemon_address.empty()) + if (m_bootstrap_daemon.get() == nullptr) + { return false; + } if (!m_should_use_bootstrap_daemon) { @@ -1610,42 +1672,38 @@ namespace cryptonote m_bootstrap_height_check_time = current_time; } - uint64_t top_height; - crypto::hash top_hash; - m_core.get_blockchain_top(top_height, top_hash); - ++top_height; // turn top block height into blockchain height + boost::optional<uint64_t> bootstrap_daemon_height = m_bootstrap_daemon->get_height(); + if (!bootstrap_daemon_height) + { + MERROR("Failed to fetch bootstrap daemon height"); + return false; + } - // query bootstrap daemon's height - cryptonote::COMMAND_RPC_GET_HEIGHT::request getheight_req; - cryptonote::COMMAND_RPC_GET_HEIGHT::response getheight_res; - bool ok = epee::net_utils::invoke_http_json("/getheight", getheight_req, getheight_res, m_http_client); - ok = ok && getheight_res.status == CORE_RPC_STATUS_OK; + uint64_t target_height = m_core.get_target_blockchain_height(); + if (*bootstrap_daemon_height < target_height) + { + MINFO("Bootstrap daemon is out of sync"); + return m_bootstrap_daemon->handle_result(false); + } - m_should_use_bootstrap_daemon = ok && top_height + 10 < getheight_res.height; - MINFO((m_should_use_bootstrap_daemon ? "Using" : "Not using") << " the bootstrap daemon (our height: " << top_height << ", bootstrap daemon's height: " << (ok ? getheight_res.height : 0) << ")"); + uint64_t top_height = m_core.get_current_blockchain_height(); + m_should_use_bootstrap_daemon = top_height + 10 < *bootstrap_daemon_height; + MINFO((m_should_use_bootstrap_daemon ? "Using" : "Not using") << " the bootstrap daemon (our height: " << top_height << ", bootstrap daemon's height: " << *bootstrap_daemon_height << ")"); } if (!m_should_use_bootstrap_daemon) return false; if (mode == invoke_http_mode::JON) { - r = epee::net_utils::invoke_http_json(command_name, req, res, m_http_client); + r = m_bootstrap_daemon->invoke_http_json(command_name, req, res); } else if (mode == invoke_http_mode::BIN) { - r = epee::net_utils::invoke_http_bin(command_name, req, res, m_http_client); + r = m_bootstrap_daemon->invoke_http_bin(command_name, req, res); } else if (mode == invoke_http_mode::JON_RPC) { - epee::json_rpc::request<typename COMMAND_TYPE::request> json_req = AUTO_VAL_INIT(json_req); - epee::json_rpc::response<typename COMMAND_TYPE::response, std::string> json_resp = AUTO_VAL_INIT(json_resp); - json_req.jsonrpc = "2.0"; - json_req.id = epee::serialization::storage_entry(0); - json_req.method = command_name; - json_req.params = req; - r = net_utils::invoke_http_json("/json_rpc", json_req, json_resp, m_http_client); - if (r) - res = json_resp.result; + r = m_bootstrap_daemon->invoke_http_json_rpc(command_name, req, res); } else { @@ -1940,12 +1998,12 @@ namespace cryptonote PERF_TIMER(on_get_bans); auto now = time(nullptr); - std::map<epee::net_utils::network_address, time_t> blocked_hosts = m_p2p.get_blocked_hosts(); - for (std::map<epee::net_utils::network_address, time_t>::const_iterator i = blocked_hosts.begin(); i != blocked_hosts.end(); ++i) + std::map<std::string, time_t> blocked_hosts = m_p2p.get_blocked_hosts(); + for (std::map<std::string, time_t>::const_iterator i = blocked_hosts.begin(); i != blocked_hosts.end(); ++i) { if (i->second > now) { COMMAND_RPC_GETBANS::ban b; - b.host = i->first.host_str(); + b.host = i->first; b.ip = 0; uint32_t ip; if (epee::string_tools::get_ip_int32_from_string(ip, b.host)) @@ -2618,7 +2676,8 @@ namespace cryptonote const command_line::arg_descriptor<std::string> core_rpc_server::arg_bootstrap_daemon_address = { "bootstrap-daemon-address" - , "URL of a 'bootstrap' remote daemon that the connected wallets can use while this daemon is still not fully synced" + , "URL of a 'bootstrap' remote daemon that the connected wallets can use while this daemon is still not fully synced.\n" + "Use 'auto' to enable automatic public nodes discovering and bootstrap daemon switching" , "" }; diff --git a/src/rpc/core_rpc_server.h b/src/rpc/core_rpc_server.h index e91d4c953..379f6ed28 100644 --- a/src/rpc/core_rpc_server.h +++ b/src/rpc/core_rpc_server.h @@ -30,9 +30,12 @@ #pragma once +#include <memory> + #include <boost/program_options/options_description.hpp> #include <boost/program_options/variables_map.hpp> +#include "bootstrap_daemon.h" #include "net/http_server_impl_base.h" #include "net/http_client.h" #include "core_rpc_server_commands_defs.h" @@ -243,6 +246,7 @@ private: //utils uint64_t get_block_reward(const block& blk); bool fill_block_header_response(const block& blk, bool orphan_status, uint64_t height, const crypto::hash& hash, block_header_response& response, bool fill_pow_hash); + boost::optional<std::string> get_random_public_node(); bool set_bootstrap_daemon(const std::string &address, const std::string &username_password); bool set_bootstrap_daemon(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials); enum invoke_http_mode { JON, BIN, JON_RPC }; @@ -251,9 +255,8 @@ private: core& m_core; nodetool::node_server<cryptonote::t_cryptonote_protocol_handler<cryptonote::core> >& m_p2p; - std::string m_bootstrap_daemon_address; - epee::net_utils::http::http_simple_client m_http_client; boost::shared_mutex m_bootstrap_daemon_mutex; + std::unique_ptr<bootstrap_daemon> m_bootstrap_daemon; bool m_should_use_bootstrap_daemon; std::chrono::system_clock::time_point m_bootstrap_height_check_time; bool m_was_bootstrap_ever_used; diff --git a/src/rpc/core_rpc_server_commands_defs.h b/src/rpc/core_rpc_server_commands_defs.h index 325ac4343..2760260f6 100644 --- a/src/rpc/core_rpc_server_commands_defs.h +++ b/src/rpc/core_rpc_server_commands_defs.h @@ -86,8 +86,8 @@ namespace cryptonote // whether they can talk to a given daemon without having to know in // advance which version they will stop working with // Don't go over 32767 for any of these -#define CORE_RPC_VERSION_MAJOR 2 -#define CORE_RPC_VERSION_MINOR 10 +#define CORE_RPC_VERSION_MAJOR 3 +#define CORE_RPC_VERSION_MINOR 0 #define MAKE_CORE_RPC_VERSION(major,minor) (((major)<<16)|(minor)) #define CORE_RPC_VERSION MAKE_CORE_RPC_VERSION(CORE_RPC_VERSION_MAJOR, CORE_RPC_VERSION_MINOR) @@ -255,56 +255,6 @@ namespace cryptonote }; typedef epee::misc_utils::struct_init<response_t> response; }; - - //----------------------------------------------- - struct COMMAND_RPC_GET_RANDOM_OUTS - { - struct request_t - { - std::vector<std::string> amounts; - uint32_t count; - - BEGIN_KV_SERIALIZE_MAP() - KV_SERIALIZE(amounts) - KV_SERIALIZE(count) - END_KV_SERIALIZE_MAP() - }; - typedef epee::misc_utils::struct_init<request_t> request; - - - struct output { - std::string public_key; - uint64_t global_index; - std::string rct; // 64+64+64 characters long (<rct commit> + <encrypted mask> + <rct amount>) - - BEGIN_KV_SERIALIZE_MAP() - KV_SERIALIZE(public_key) - KV_SERIALIZE(global_index) - KV_SERIALIZE(rct) - END_KV_SERIALIZE_MAP() - }; - - struct amount_out { - uint64_t amount; - std::vector<output> outputs; - BEGIN_KV_SERIALIZE_MAP() - KV_SERIALIZE(amount) - KV_SERIALIZE(outputs) - END_KV_SERIALIZE_MAP() - - }; - - struct response_t - { - std::vector<amount_out> amount_outs; - std::string Error; - BEGIN_KV_SERIALIZE_MAP() - KV_SERIALIZE(amount_outs) - KV_SERIALIZE(Error) - END_KV_SERIALIZE_MAP() - }; - typedef epee::misc_utils::struct_init<response_t> response; - }; //----------------------------------------------- struct COMMAND_RPC_SUBMIT_RAW_TX { @@ -944,6 +894,8 @@ namespace cryptonote uint64_t reserved_offset; uint64_t expected_reward; std::string prev_hash; + std::string seed_hash; + std::string next_seed_hash; blobdata blocktemplate_blob; blobdata blockhashing_blob; std::string status; @@ -961,6 +913,8 @@ namespace cryptonote KV_SERIALIZE(blockhashing_blob) KV_SERIALIZE(status) KV_SERIALIZE(untrusted) + KV_SERIALIZE(seed_hash) + KV_SERIALIZE(next_seed_hash) END_KV_SERIALIZE_MAP() }; typedef epee::misc_utils::struct_init<response_t> response; @@ -1566,7 +1520,7 @@ namespace cryptonote KV_SERIALIZE(num_10m) KV_SERIALIZE(num_not_relayed) KV_SERIALIZE(histo_98pc) - KV_SERIALIZE_CONTAINER_POD_AS_BLOB(histo) + KV_SERIALIZE(histo) KV_SERIALIZE(num_double_spends) END_KV_SERIALIZE_MAP() }; diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp index 668a2e5cd..1ee55673e 100644 --- a/src/rpc/zmq_server.cpp +++ b/src/rpc/zmq_server.cpp @@ -28,18 +28,29 @@ #include "zmq_server.h" +#include <chrono> +#include <cstdint> +#include <system_error> + namespace cryptonote { +namespace +{ + constexpr const int num_zmq_threads = 1; + constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB + constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages +} + namespace rpc { ZmqServer::ZmqServer(RpcHandler& h) : handler(h), - stop_signal(false), - running(false), - context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable + context(zmq_init(num_zmq_threads)) { + if (!context) + MONERO_ZMQ_THROW("Unable to create ZMQ context"); } ZmqServer::~ZmqServer() @@ -48,71 +59,88 @@ ZmqServer::~ZmqServer() void ZmqServer::serve() { - - while (1) + try { - try + // socket must close before `zmq_term` will exit. + const net::zmq::socket socket = std::move(rep_socket); + if (!socket) { - zmq::message_t message; - - if (!rep_socket) - { - throw std::runtime_error("ZMQ RPC server reply socket is null"); - } - while (rep_socket->recv(&message, 0)) - { - std::string message_string(reinterpret_cast<const char *>(message.data()), message.size()); - - MDEBUG(std::string("Received RPC request: \"") + message_string + "\""); - - std::string response = handler.handle(message_string); - - zmq::message_t reply(response.size()); - memcpy((void *) reply.data(), response.c_str(), response.size()); - - rep_socket->send(reply); - MDEBUG(std::string("Sent RPC reply: \"") + response + "\""); - - } - } - catch (const boost::thread_interrupted& e) - { - MDEBUG("ZMQ Server thread interrupted."); + MERROR("ZMQ RPC server reply socket is null"); + return; } - catch (const zmq::error_t& e) + + while (1) { - MERROR(std::string("ZMQ error: ") + e.what()); + const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get())); + MDEBUG("Received RPC request: \"" << message << "\""); + const std::string& response = handler.handle(message); + + MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get())); + MDEBUG("Sent RPC reply: \"" << response << "\""); } - boost::this_thread::interruption_point(); + } + catch (const std::system_error& e) + { + if (e.code() != net::zmq::make_error_code(ETERM)) + MERROR("ZMQ RPC Server Error: " << e.what()); + } + catch (const std::exception& e) + { + MERROR("ZMQ RPC Server Error: " << e.what()); + } + catch (...) + { + MERROR("Unknown error in ZMQ RPC server"); } } -bool ZmqServer::addIPCSocket(std::string address, std::string port) +bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port) { MERROR("ZmqServer::addIPCSocket not yet implemented!"); return false; } -bool ZmqServer::addTCPSocket(std::string address, std::string port) +bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port) { - try + if (!context) { - std::string addr_prefix("tcp://"); + MERROR("ZMQ RPC Server already shutdown"); + return false; + } - rep_socket.reset(new zmq::socket_t(context, ZMQ_REP)); + rep_socket.reset(zmq_socket(context.get(), ZMQ_REP)); + if (!rep_socket) + { + MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed"); + return false; + } - rep_socket->setsockopt(ZMQ_RCVTIMEO, &DEFAULT_RPC_RECV_TIMEOUT_MS, sizeof(DEFAULT_RPC_RECV_TIMEOUT_MS)); + if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0) + { + MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size"); + return false; + } - if (address.empty()) - address = "*"; - if (port.empty()) - port = "*"; - std::string bind_address = addr_prefix + address + std::string(":") + port; - rep_socket->bind(bind_address.c_str()); + static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count(); + if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0) + { + MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout"); + return false; } - catch (const std::exception& e) + + if (address.empty()) + address = "*"; + if (port.empty()) + port = "*"; + + std::string bind_address = "tcp://"; + bind_address.append(address.data(), address.size()); + bind_address += ":"; + bind_address.append(port.data(), port.size()); + + if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0) { - MERROR(std::string("Error creating ZMQ Socket: ") + e.what()); + MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed"); return false; } return true; @@ -120,22 +148,16 @@ bool ZmqServer::addTCPSocket(std::string address, std::string port) void ZmqServer::run() { - running = true; run_thread = boost::thread(boost::bind(&ZmqServer::serve, this)); } void ZmqServer::stop() { - if (!running) return; - - stop_signal = true; + if (!run_thread.joinable()) + return; - run_thread.interrupt(); + context.reset(); // destroying context terminates all calls run_thread.join(); - - running = false; - - return; } diff --git a/src/rpc/zmq_server.h b/src/rpc/zmq_server.h index 1b1e4c7cf..ce7892dab 100644 --- a/src/rpc/zmq_server.h +++ b/src/rpc/zmq_server.h @@ -29,12 +29,10 @@ #pragma once #include <boost/thread/thread.hpp> -#include <zmq.hpp> -#include <string> -#include <memory> +#include <boost/utility/string_ref.hpp> #include "common/command_line.h" - +#include "net/zmq.h" #include "rpc_handler.h" namespace cryptonote @@ -43,9 +41,6 @@ namespace cryptonote namespace rpc { -static constexpr int DEFAULT_NUM_ZMQ_THREADS = 1; -static constexpr int DEFAULT_RPC_RECV_TIMEOUT_MS = 1000; - class ZmqServer { public: @@ -58,8 +53,8 @@ class ZmqServer void serve(); - bool addIPCSocket(std::string address, std::string port); - bool addTCPSocket(std::string address, std::string port); + bool addIPCSocket(boost::string_ref address, boost::string_ref port); + bool addTCPSocket(boost::string_ref address, boost::string_ref port); void run(); void stop(); @@ -67,14 +62,11 @@ class ZmqServer private: RpcHandler& handler; - volatile bool stop_signal; - volatile bool running; - - zmq::context_t context; + net::zmq::context context; boost::thread run_thread; - std::unique_ptr<zmq::socket_t> rep_socket; + net::zmq::socket rep_socket; }; |