aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/CMakeLists.txt4
-rw-r--r--src/rpc/bootstrap_daemon.cpp95
-rw-r--r--src/rpc/bootstrap_daemon.h67
-rw-r--r--src/rpc/core_rpc_server.cpp139
-rw-r--r--src/rpc/core_rpc_server.h7
-rw-r--r--src/rpc/core_rpc_server_commands_defs.h60
-rw-r--r--src/rpc/zmq_server.cpp136
-rw-r--r--src/rpc/zmq_server.h20
8 files changed, 362 insertions, 166 deletions
diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt
index 06577d37e..116e7f568 100644
--- a/src/rpc/CMakeLists.txt
+++ b/src/rpc/CMakeLists.txt
@@ -26,10 +26,13 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+include_directories(SYSTEM ${ZMQ_INCLUDE_PATH})
+
set(rpc_base_sources
rpc_args.cpp)
set(rpc_sources
+ bootstrap_daemon.cpp
core_rpc_server.cpp
rpc_handler.cpp
instanciations)
@@ -53,6 +56,7 @@ set(daemon_rpc_server_headers)
set(rpc_daemon_private_headers
+ bootstrap_daemon.h
core_rpc_server.h
core_rpc_server_commands_defs.h
core_rpc_server_error_codes.h)
diff --git a/src/rpc/bootstrap_daemon.cpp b/src/rpc/bootstrap_daemon.cpp
new file mode 100644
index 000000000..c97b2c95a
--- /dev/null
+++ b/src/rpc/bootstrap_daemon.cpp
@@ -0,0 +1,95 @@
+#include "bootstrap_daemon.h"
+
+#include <stdexcept>
+
+#include "crypto/crypto.h"
+#include "cryptonote_core/cryptonote_core.h"
+#include "misc_log_ex.h"
+
+#undef MONERO_DEFAULT_LOG_CATEGORY
+#define MONERO_DEFAULT_LOG_CATEGORY "daemon.rpc.bootstrap_daemon"
+
+namespace cryptonote
+{
+
+ bootstrap_daemon::bootstrap_daemon(std::function<boost::optional<std::string>()> get_next_public_node)
+ : m_get_next_public_node(get_next_public_node)
+ {
+ }
+
+ bootstrap_daemon::bootstrap_daemon(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials)
+ : bootstrap_daemon(nullptr)
+ {
+ if (!set_server(address, credentials))
+ {
+ throw std::runtime_error("invalid bootstrap daemon address or credentials");
+ }
+ }
+
+ std::string bootstrap_daemon::address() const noexcept
+ {
+ const auto& host = m_http_client.get_host();
+ if (host.empty())
+ {
+ return std::string();
+ }
+ return host + ":" + m_http_client.get_port();
+ }
+
+ boost::optional<uint64_t> bootstrap_daemon::get_height()
+ {
+ cryptonote::COMMAND_RPC_GET_HEIGHT::request req;
+ cryptonote::COMMAND_RPC_GET_HEIGHT::response res;
+
+ if (!invoke_http_json("/getheight", req, res))
+ {
+ return boost::none;
+ }
+
+ if (res.status != CORE_RPC_STATUS_OK)
+ {
+ return boost::none;
+ }
+
+ return res.height;
+ }
+
+ bool bootstrap_daemon::handle_result(bool success)
+ {
+ if (!success && m_get_next_public_node)
+ {
+ m_http_client.disconnect();
+ }
+
+ return success;
+ }
+
+ bool bootstrap_daemon::set_server(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials /* = boost::none */)
+ {
+ if (!m_http_client.set_server(address, credentials))
+ {
+ MERROR("Failed to set bootstrap daemon address " << address);
+ return false;
+ }
+
+ MINFO("Changed bootstrap daemon address to " << address);
+ return true;
+ }
+
+
+ bool bootstrap_daemon::switch_server_if_needed()
+ {
+ if (!m_get_next_public_node || m_http_client.is_connected())
+ {
+ return true;
+ }
+
+ const boost::optional<std::string> address = m_get_next_public_node();
+ if (address) {
+ return set_server(*address);
+ }
+
+ return false;
+ }
+
+}
diff --git a/src/rpc/bootstrap_daemon.h b/src/rpc/bootstrap_daemon.h
new file mode 100644
index 000000000..6276b1b21
--- /dev/null
+++ b/src/rpc/bootstrap_daemon.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include <functional>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <boost/utility/string_ref.hpp>
+
+#include "net/http_client.h"
+#include "storages/http_abstract_invoke.h"
+
+namespace cryptonote
+{
+
+ class bootstrap_daemon
+ {
+ public:
+ bootstrap_daemon(std::function<boost::optional<std::string>()> get_next_public_node);
+ bootstrap_daemon(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials);
+
+ std::string address() const noexcept;
+ boost::optional<uint64_t> get_height();
+ bool handle_result(bool success);
+
+ template <class t_request, class t_response>
+ bool invoke_http_json(const boost::string_ref uri, const t_request &out_struct, t_response &result_struct)
+ {
+ if (!switch_server_if_needed())
+ {
+ return false;
+ }
+
+ return handle_result(epee::net_utils::invoke_http_json(uri, out_struct, result_struct, m_http_client));
+ }
+
+ template <class t_request, class t_response>
+ bool invoke_http_bin(const boost::string_ref uri, const t_request &out_struct, t_response &result_struct)
+ {
+ if (!switch_server_if_needed())
+ {
+ return false;
+ }
+
+ return handle_result(epee::net_utils::invoke_http_bin(uri, out_struct, result_struct, m_http_client));
+ }
+
+ template <class t_request, class t_response>
+ bool invoke_http_json_rpc(const boost::string_ref command_name, const t_request &out_struct, t_response &result_struct)
+ {
+ if (!switch_server_if_needed())
+ {
+ return false;
+ }
+
+ return handle_result(epee::net_utils::invoke_http_json_rpc("/json_rpc", std::string(command_name.begin(), command_name.end()), out_struct, result_struct, m_http_client));
+ }
+
+ private:
+ bool set_server(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials = boost::none);
+ bool switch_server_if_needed();
+
+ private:
+ epee::net_utils::http::http_simple_client m_http_client;
+ std::function<boost::optional<std::string>()> m_get_next_public_node;
+ };
+
+}
diff --git a/src/rpc/core_rpc_server.cpp b/src/rpc/core_rpc_server.cpp
index 1fc4b816f..66af4a364 100644
--- a/src/rpc/core_rpc_server.cpp
+++ b/src/rpc/core_rpc_server.cpp
@@ -103,6 +103,7 @@ namespace cryptonote
)
: m_core(cr)
, m_p2p(p2p)
+ , m_was_bootstrap_ever_used(false)
{}
//------------------------------------------------------------------------------------------------------------------------------
bool core_rpc_server::set_bootstrap_daemon(const std::string &address, const std::string &username_password)
@@ -116,20 +117,59 @@ namespace cryptonote
return set_bootstrap_daemon(address, credentials);
}
//------------------------------------------------------------------------------------------------------------------------------
+ boost::optional<std::string> core_rpc_server::get_random_public_node()
+ {
+ COMMAND_RPC_GET_PUBLIC_NODES::request request;
+ COMMAND_RPC_GET_PUBLIC_NODES::response response;
+
+ request.gray = true;
+ request.white = true;
+ if (!on_get_public_nodes(request, response) || response.status != CORE_RPC_STATUS_OK)
+ {
+ return boost::none;
+ }
+
+ const auto get_random_node_address = [](const std::vector<public_node>& public_nodes) -> std::string {
+ const auto& random_node = public_nodes[crypto::rand_idx(public_nodes.size())];
+ const auto address = random_node.host + ":" + std::to_string(random_node.rpc_port);
+ return address;
+ };
+
+ if (!response.white.empty())
+ {
+ return get_random_node_address(response.white);
+ }
+
+ MDEBUG("No white public node found, checking gray peers");
+
+ if (!response.gray.empty())
+ {
+ return get_random_node_address(response.gray);
+ }
+
+ MERROR("Failed to find any suitable public node");
+
+ return boost::none;
+ }
+ //------------------------------------------------------------------------------------------------------------------------------
bool core_rpc_server::set_bootstrap_daemon(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials)
{
boost::unique_lock<boost::shared_mutex> lock(m_bootstrap_daemon_mutex);
- if (!address.empty())
+ if (address.empty())
{
- if (!m_http_client.set_server(address, credentials, epee::net_utils::ssl_support_t::e_ssl_support_autodetect))
- {
- return false;
- }
+ m_bootstrap_daemon.reset(nullptr);
+ }
+ else if (address == "auto")
+ {
+ m_bootstrap_daemon.reset(new bootstrap_daemon([this]{ return get_random_public_node(); }));
+ }
+ else
+ {
+ m_bootstrap_daemon.reset(new bootstrap_daemon(address, credentials));
}
- m_bootstrap_daemon_address = address;
- m_should_use_bootstrap_daemon = !m_bootstrap_daemon_address.empty();
+ m_should_use_bootstrap_daemon = m_bootstrap_daemon.get() != nullptr;
return true;
}
@@ -220,7 +260,10 @@ namespace cryptonote
{
{
boost::shared_lock<boost::shared_mutex> lock(m_bootstrap_daemon_mutex);
- res.bootstrap_daemon_address = m_bootstrap_daemon_address;
+ if (m_bootstrap_daemon.get() != nullptr)
+ {
+ res.bootstrap_daemon_address = m_bootstrap_daemon->address();
+ }
}
crypto::hash top_hash;
m_core.get_blockchain_top(res.height_without_bootstrap, top_hash);
@@ -269,7 +312,10 @@ namespace cryptonote
else
{
boost::shared_lock<boost::shared_mutex> lock(m_bootstrap_daemon_mutex);
- res.bootstrap_daemon_address = m_bootstrap_daemon_address;
+ if (m_bootstrap_daemon.get() != nullptr)
+ {
+ res.bootstrap_daemon_address = m_bootstrap_daemon->address();
+ }
res.was_bootstrap_ever_used = m_was_bootstrap_ever_used;
}
res.database_size = m_core.get_blockchain_storage().get_db().get_database_size();
@@ -1004,7 +1050,8 @@ namespace cryptonote
res.block_reward = lMiner.get_block_reward();
}
const account_public_address& lMiningAdr = lMiner.get_mining_address();
- res.address = get_account_address_as_str(nettype(), false, lMiningAdr);
+ if (lMiner.is_mining() || lMiner.get_is_background_mining_enabled())
+ res.address = get_account_address_as_str(nettype(), false, lMiningAdr);
const uint8_t major_version = m_core.get_blockchain_storage().get_current_hard_fork_version();
const unsigned variant = major_version >= 7 ? major_version - 6 : 0;
switch (variant)
@@ -1013,6 +1060,7 @@ namespace cryptonote
case 1: res.pow_algorithm = "CNv1 (Cryptonight variant 1)"; break;
case 2: case 3: res.pow_algorithm = "CNv2 (Cryptonight variant 2)"; break;
case 4: case 5: res.pow_algorithm = "CNv4 (Cryptonight variant 4)"; break;
+ case 6: res.pow_algorithm = "RandomX"; break;
default: res.pow_algorithm = "I'm not sure actually"; break;
}
if (res.is_background_mining_enabled)
@@ -1393,6 +1441,18 @@ namespace cryptonote
LOG_ERROR("Failed to create block template");
return false;
}
+ if (b.major_version >= RX_BLOCK_VERSION)
+ {
+ uint64_t seed_height, next_height;
+ crypto::hash seed_hash;
+ crypto::rx_seedheights(res.height, &seed_height, &next_height);
+ seed_hash = m_core.get_block_id_by_height(seed_height);
+ res.seed_hash = string_tools::pod_to_hex(seed_hash);
+ if (next_height != seed_height) {
+ seed_hash = m_core.get_block_id_by_height(next_height);
+ res.next_seed_hash = string_tools::pod_to_hex(seed_hash);
+ }
+ }
store_difficulty(wdiff, res.difficulty, res.wide_difficulty, res.difficulty_top64);
blobdata block_blob = t_serializable_object_to_blob(b);
crypto::public_key tx_pub_key = cryptonote::get_tx_pub_key_from_extra(b.miner_tx);
@@ -1509,7 +1569,7 @@ namespace cryptonote
template_req.reserve_size = 1;
template_req.wallet_address = req.wallet_address;
template_req.prev_block = req.prev_block;
- submit_req.push_back(boost::value_initialized<std::string>());
+ submit_req.push_back(std::string{});
res.height = m_core.get_blockchain_storage().get_current_blockchain_height();
for(size_t i = 0; i < req.amount_of_blocks; i++)
@@ -1535,7 +1595,7 @@ namespace cryptonote
return false;
}
b.nonce = req.starting_nonce;
- miner::find_nonce_for_given_block(b, template_res.difficulty, template_res.height);
+ miner::find_nonce_for_given_block(&(m_core.get_blockchain_storage()), b, template_res.difficulty, template_res.height);
submit_req.front() = string_tools::buff_to_hex_nodelimer(block_to_blob(b));
r = on_submitblock(submit_req, submit_res, error_resp, ctx);
@@ -1580,7 +1640,7 @@ namespace cryptonote
response.reward = get_block_reward(blk);
response.block_size = response.block_weight = m_core.get_blockchain_storage().get_db().get_block_weight(height);
response.num_txes = blk.tx_hashes.size();
- response.pow_hash = fill_pow_hash ? string_tools::pod_to_hex(get_block_longhash(blk, height)) : "";
+ response.pow_hash = fill_pow_hash ? string_tools::pod_to_hex(get_block_longhash(&(m_core.get_blockchain_storage()), blk, height, 0)) : "";
response.long_term_weight = m_core.get_blockchain_storage().get_db().get_block_long_term_weight(height);
response.miner_tx_hash = string_tools::pod_to_hex(cryptonote::get_transaction_hash(blk.miner_tx));
return true;
@@ -1593,8 +1653,10 @@ namespace cryptonote
boost::upgrade_lock<boost::shared_mutex> upgrade_lock(m_bootstrap_daemon_mutex);
- if (m_bootstrap_daemon_address.empty())
+ if (m_bootstrap_daemon.get() == nullptr)
+ {
return false;
+ }
if (!m_should_use_bootstrap_daemon)
{
@@ -1610,42 +1672,38 @@ namespace cryptonote
m_bootstrap_height_check_time = current_time;
}
- uint64_t top_height;
- crypto::hash top_hash;
- m_core.get_blockchain_top(top_height, top_hash);
- ++top_height; // turn top block height into blockchain height
+ boost::optional<uint64_t> bootstrap_daemon_height = m_bootstrap_daemon->get_height();
+ if (!bootstrap_daemon_height)
+ {
+ MERROR("Failed to fetch bootstrap daemon height");
+ return false;
+ }
- // query bootstrap daemon's height
- cryptonote::COMMAND_RPC_GET_HEIGHT::request getheight_req;
- cryptonote::COMMAND_RPC_GET_HEIGHT::response getheight_res;
- bool ok = epee::net_utils::invoke_http_json("/getheight", getheight_req, getheight_res, m_http_client);
- ok = ok && getheight_res.status == CORE_RPC_STATUS_OK;
+ uint64_t target_height = m_core.get_target_blockchain_height();
+ if (*bootstrap_daemon_height < target_height)
+ {
+ MINFO("Bootstrap daemon is out of sync");
+ return m_bootstrap_daemon->handle_result(false);
+ }
- m_should_use_bootstrap_daemon = ok && top_height + 10 < getheight_res.height;
- MINFO((m_should_use_bootstrap_daemon ? "Using" : "Not using") << " the bootstrap daemon (our height: " << top_height << ", bootstrap daemon's height: " << (ok ? getheight_res.height : 0) << ")");
+ uint64_t top_height = m_core.get_current_blockchain_height();
+ m_should_use_bootstrap_daemon = top_height + 10 < *bootstrap_daemon_height;
+ MINFO((m_should_use_bootstrap_daemon ? "Using" : "Not using") << " the bootstrap daemon (our height: " << top_height << ", bootstrap daemon's height: " << *bootstrap_daemon_height << ")");
}
if (!m_should_use_bootstrap_daemon)
return false;
if (mode == invoke_http_mode::JON)
{
- r = epee::net_utils::invoke_http_json(command_name, req, res, m_http_client);
+ r = m_bootstrap_daemon->invoke_http_json(command_name, req, res);
}
else if (mode == invoke_http_mode::BIN)
{
- r = epee::net_utils::invoke_http_bin(command_name, req, res, m_http_client);
+ r = m_bootstrap_daemon->invoke_http_bin(command_name, req, res);
}
else if (mode == invoke_http_mode::JON_RPC)
{
- epee::json_rpc::request<typename COMMAND_TYPE::request> json_req = AUTO_VAL_INIT(json_req);
- epee::json_rpc::response<typename COMMAND_TYPE::response, std::string> json_resp = AUTO_VAL_INIT(json_resp);
- json_req.jsonrpc = "2.0";
- json_req.id = epee::serialization::storage_entry(0);
- json_req.method = command_name;
- json_req.params = req;
- r = net_utils::invoke_http_json("/json_rpc", json_req, json_resp, m_http_client);
- if (r)
- res = json_resp.result;
+ r = m_bootstrap_daemon->invoke_http_json_rpc(command_name, req, res);
}
else
{
@@ -1940,12 +1998,12 @@ namespace cryptonote
PERF_TIMER(on_get_bans);
auto now = time(nullptr);
- std::map<epee::net_utils::network_address, time_t> blocked_hosts = m_p2p.get_blocked_hosts();
- for (std::map<epee::net_utils::network_address, time_t>::const_iterator i = blocked_hosts.begin(); i != blocked_hosts.end(); ++i)
+ std::map<std::string, time_t> blocked_hosts = m_p2p.get_blocked_hosts();
+ for (std::map<std::string, time_t>::const_iterator i = blocked_hosts.begin(); i != blocked_hosts.end(); ++i)
{
if (i->second > now) {
COMMAND_RPC_GETBANS::ban b;
- b.host = i->first.host_str();
+ b.host = i->first;
b.ip = 0;
uint32_t ip;
if (epee::string_tools::get_ip_int32_from_string(ip, b.host))
@@ -2618,7 +2676,8 @@ namespace cryptonote
const command_line::arg_descriptor<std::string> core_rpc_server::arg_bootstrap_daemon_address = {
"bootstrap-daemon-address"
- , "URL of a 'bootstrap' remote daemon that the connected wallets can use while this daemon is still not fully synced"
+ , "URL of a 'bootstrap' remote daemon that the connected wallets can use while this daemon is still not fully synced.\n"
+ "Use 'auto' to enable automatic public nodes discovering and bootstrap daemon switching"
, ""
};
diff --git a/src/rpc/core_rpc_server.h b/src/rpc/core_rpc_server.h
index e91d4c953..379f6ed28 100644
--- a/src/rpc/core_rpc_server.h
+++ b/src/rpc/core_rpc_server.h
@@ -30,9 +30,12 @@
#pragma once
+#include <memory>
+
#include <boost/program_options/options_description.hpp>
#include <boost/program_options/variables_map.hpp>
+#include "bootstrap_daemon.h"
#include "net/http_server_impl_base.h"
#include "net/http_client.h"
#include "core_rpc_server_commands_defs.h"
@@ -243,6 +246,7 @@ private:
//utils
uint64_t get_block_reward(const block& blk);
bool fill_block_header_response(const block& blk, bool orphan_status, uint64_t height, const crypto::hash& hash, block_header_response& response, bool fill_pow_hash);
+ boost::optional<std::string> get_random_public_node();
bool set_bootstrap_daemon(const std::string &address, const std::string &username_password);
bool set_bootstrap_daemon(const std::string &address, const boost::optional<epee::net_utils::http::login> &credentials);
enum invoke_http_mode { JON, BIN, JON_RPC };
@@ -251,9 +255,8 @@ private:
core& m_core;
nodetool::node_server<cryptonote::t_cryptonote_protocol_handler<cryptonote::core> >& m_p2p;
- std::string m_bootstrap_daemon_address;
- epee::net_utils::http::http_simple_client m_http_client;
boost::shared_mutex m_bootstrap_daemon_mutex;
+ std::unique_ptr<bootstrap_daemon> m_bootstrap_daemon;
bool m_should_use_bootstrap_daemon;
std::chrono::system_clock::time_point m_bootstrap_height_check_time;
bool m_was_bootstrap_ever_used;
diff --git a/src/rpc/core_rpc_server_commands_defs.h b/src/rpc/core_rpc_server_commands_defs.h
index 325ac4343..2760260f6 100644
--- a/src/rpc/core_rpc_server_commands_defs.h
+++ b/src/rpc/core_rpc_server_commands_defs.h
@@ -86,8 +86,8 @@ namespace cryptonote
// whether they can talk to a given daemon without having to know in
// advance which version they will stop working with
// Don't go over 32767 for any of these
-#define CORE_RPC_VERSION_MAJOR 2
-#define CORE_RPC_VERSION_MINOR 10
+#define CORE_RPC_VERSION_MAJOR 3
+#define CORE_RPC_VERSION_MINOR 0
#define MAKE_CORE_RPC_VERSION(major,minor) (((major)<<16)|(minor))
#define CORE_RPC_VERSION MAKE_CORE_RPC_VERSION(CORE_RPC_VERSION_MAJOR, CORE_RPC_VERSION_MINOR)
@@ -255,56 +255,6 @@ namespace cryptonote
};
typedef epee::misc_utils::struct_init<response_t> response;
};
-
- //-----------------------------------------------
- struct COMMAND_RPC_GET_RANDOM_OUTS
- {
- struct request_t
- {
- std::vector<std::string> amounts;
- uint32_t count;
-
- BEGIN_KV_SERIALIZE_MAP()
- KV_SERIALIZE(amounts)
- KV_SERIALIZE(count)
- END_KV_SERIALIZE_MAP()
- };
- typedef epee::misc_utils::struct_init<request_t> request;
-
-
- struct output {
- std::string public_key;
- uint64_t global_index;
- std::string rct; // 64+64+64 characters long (<rct commit> + <encrypted mask> + <rct amount>)
-
- BEGIN_KV_SERIALIZE_MAP()
- KV_SERIALIZE(public_key)
- KV_SERIALIZE(global_index)
- KV_SERIALIZE(rct)
- END_KV_SERIALIZE_MAP()
- };
-
- struct amount_out {
- uint64_t amount;
- std::vector<output> outputs;
- BEGIN_KV_SERIALIZE_MAP()
- KV_SERIALIZE(amount)
- KV_SERIALIZE(outputs)
- END_KV_SERIALIZE_MAP()
-
- };
-
- struct response_t
- {
- std::vector<amount_out> amount_outs;
- std::string Error;
- BEGIN_KV_SERIALIZE_MAP()
- KV_SERIALIZE(amount_outs)
- KV_SERIALIZE(Error)
- END_KV_SERIALIZE_MAP()
- };
- typedef epee::misc_utils::struct_init<response_t> response;
- };
//-----------------------------------------------
struct COMMAND_RPC_SUBMIT_RAW_TX
{
@@ -944,6 +894,8 @@ namespace cryptonote
uint64_t reserved_offset;
uint64_t expected_reward;
std::string prev_hash;
+ std::string seed_hash;
+ std::string next_seed_hash;
blobdata blocktemplate_blob;
blobdata blockhashing_blob;
std::string status;
@@ -961,6 +913,8 @@ namespace cryptonote
KV_SERIALIZE(blockhashing_blob)
KV_SERIALIZE(status)
KV_SERIALIZE(untrusted)
+ KV_SERIALIZE(seed_hash)
+ KV_SERIALIZE(next_seed_hash)
END_KV_SERIALIZE_MAP()
};
typedef epee::misc_utils::struct_init<response_t> response;
@@ -1566,7 +1520,7 @@ namespace cryptonote
KV_SERIALIZE(num_10m)
KV_SERIALIZE(num_not_relayed)
KV_SERIALIZE(histo_98pc)
- KV_SERIALIZE_CONTAINER_POD_AS_BLOB(histo)
+ KV_SERIALIZE(histo)
KV_SERIALIZE(num_double_spends)
END_KV_SERIALIZE_MAP()
};
diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp
index 668a2e5cd..1ee55673e 100644
--- a/src/rpc/zmq_server.cpp
+++ b/src/rpc/zmq_server.cpp
@@ -28,18 +28,29 @@
#include "zmq_server.h"
+#include <chrono>
+#include <cstdint>
+#include <system_error>
+
namespace cryptonote
{
+namespace
+{
+ constexpr const int num_zmq_threads = 1;
+ constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
+ constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
+}
+
namespace rpc
{
ZmqServer::ZmqServer(RpcHandler& h) :
handler(h),
- stop_signal(false),
- running(false),
- context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable
+ context(zmq_init(num_zmq_threads))
{
+ if (!context)
+ MONERO_ZMQ_THROW("Unable to create ZMQ context");
}
ZmqServer::~ZmqServer()
@@ -48,71 +59,88 @@ ZmqServer::~ZmqServer()
void ZmqServer::serve()
{
-
- while (1)
+ try
{
- try
+ // socket must close before `zmq_term` will exit.
+ const net::zmq::socket socket = std::move(rep_socket);
+ if (!socket)
{
- zmq::message_t message;
-
- if (!rep_socket)
- {
- throw std::runtime_error("ZMQ RPC server reply socket is null");
- }
- while (rep_socket->recv(&message, 0))
- {
- std::string message_string(reinterpret_cast<const char *>(message.data()), message.size());
-
- MDEBUG(std::string("Received RPC request: \"") + message_string + "\"");
-
- std::string response = handler.handle(message_string);
-
- zmq::message_t reply(response.size());
- memcpy((void *) reply.data(), response.c_str(), response.size());
-
- rep_socket->send(reply);
- MDEBUG(std::string("Sent RPC reply: \"") + response + "\"");
-
- }
- }
- catch (const boost::thread_interrupted& e)
- {
- MDEBUG("ZMQ Server thread interrupted.");
+ MERROR("ZMQ RPC server reply socket is null");
+ return;
}
- catch (const zmq::error_t& e)
+
+ while (1)
{
- MERROR(std::string("ZMQ error: ") + e.what());
+ const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
+ MDEBUG("Received RPC request: \"" << message << "\"");
+ const std::string& response = handler.handle(message);
+
+ MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
+ MDEBUG("Sent RPC reply: \"" << response << "\"");
}
- boost::this_thread::interruption_point();
+ }
+ catch (const std::system_error& e)
+ {
+ if (e.code() != net::zmq::make_error_code(ETERM))
+ MERROR("ZMQ RPC Server Error: " << e.what());
+ }
+ catch (const std::exception& e)
+ {
+ MERROR("ZMQ RPC Server Error: " << e.what());
+ }
+ catch (...)
+ {
+ MERROR("Unknown error in ZMQ RPC server");
}
}
-bool ZmqServer::addIPCSocket(std::string address, std::string port)
+bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
{
MERROR("ZmqServer::addIPCSocket not yet implemented!");
return false;
}
-bool ZmqServer::addTCPSocket(std::string address, std::string port)
+bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
{
- try
+ if (!context)
{
- std::string addr_prefix("tcp://");
+ MERROR("ZMQ RPC Server already shutdown");
+ return false;
+ }
- rep_socket.reset(new zmq::socket_t(context, ZMQ_REP));
+ rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
+ if (!rep_socket)
+ {
+ MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
+ return false;
+ }
- rep_socket->setsockopt(ZMQ_RCVTIMEO, &DEFAULT_RPC_RECV_TIMEOUT_MS, sizeof(DEFAULT_RPC_RECV_TIMEOUT_MS));
+ if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
+ return false;
+ }
- if (address.empty())
- address = "*";
- if (port.empty())
- port = "*";
- std::string bind_address = addr_prefix + address + std::string(":") + port;
- rep_socket->bind(bind_address.c_str());
+ static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
+ if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
+ {
+ MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
+ return false;
}
- catch (const std::exception& e)
+
+ if (address.empty())
+ address = "*";
+ if (port.empty())
+ port = "*";
+
+ std::string bind_address = "tcp://";
+ bind_address.append(address.data(), address.size());
+ bind_address += ":";
+ bind_address.append(port.data(), port.size());
+
+ if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
{
- MERROR(std::string("Error creating ZMQ Socket: ") + e.what());
+ MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
return false;
}
return true;
@@ -120,22 +148,16 @@ bool ZmqServer::addTCPSocket(std::string address, std::string port)
void ZmqServer::run()
{
- running = true;
run_thread = boost::thread(boost::bind(&ZmqServer::serve, this));
}
void ZmqServer::stop()
{
- if (!running) return;
-
- stop_signal = true;
+ if (!run_thread.joinable())
+ return;
- run_thread.interrupt();
+ context.reset(); // destroying context terminates all calls
run_thread.join();
-
- running = false;
-
- return;
}
diff --git a/src/rpc/zmq_server.h b/src/rpc/zmq_server.h
index 1b1e4c7cf..ce7892dab 100644
--- a/src/rpc/zmq_server.h
+++ b/src/rpc/zmq_server.h
@@ -29,12 +29,10 @@
#pragma once
#include <boost/thread/thread.hpp>
-#include <zmq.hpp>
-#include <string>
-#include <memory>
+#include <boost/utility/string_ref.hpp>
#include "common/command_line.h"
-
+#include "net/zmq.h"
#include "rpc_handler.h"
namespace cryptonote
@@ -43,9 +41,6 @@ namespace cryptonote
namespace rpc
{
-static constexpr int DEFAULT_NUM_ZMQ_THREADS = 1;
-static constexpr int DEFAULT_RPC_RECV_TIMEOUT_MS = 1000;
-
class ZmqServer
{
public:
@@ -58,8 +53,8 @@ class ZmqServer
void serve();
- bool addIPCSocket(std::string address, std::string port);
- bool addTCPSocket(std::string address, std::string port);
+ bool addIPCSocket(boost::string_ref address, boost::string_ref port);
+ bool addTCPSocket(boost::string_ref address, boost::string_ref port);
void run();
void stop();
@@ -67,14 +62,11 @@ class ZmqServer
private:
RpcHandler& handler;
- volatile bool stop_signal;
- volatile bool running;
-
- zmq::context_t context;
+ net::zmq::context context;
boost::thread run_thread;
- std::unique_ptr<zmq::socket_t> rep_socket;
+ net::zmq::socket rep_socket;
};