diff options
Diffstat (limited to '')
-rw-r--r-- | src/daemon/command_line_args.h | 4 | ||||
-rw-r--r-- | src/daemon/daemon.cpp | 71 | ||||
-rw-r--r-- | src/daemon/daemon.h | 3 | ||||
-rw-r--r-- | src/daemon/main.cpp | 1 |
4 files changed, 46 insertions, 33 deletions
diff --git a/src/daemon/command_line_args.h b/src/daemon/command_line_args.h index 0ce987bcc..6c3e163e6 100644 --- a/src/daemon/command_line_args.h +++ b/src/daemon/command_line_args.h @@ -121,6 +121,10 @@ namespace daemon_args return val; } }; + const command_line::arg_descriptor<std::vector<std::string>> arg_zmq_pub = { + "zmq-pub" + , "Address for ZMQ pub - tcp://ip:port or ipc://path" + }; const command_line::arg_descriptor<bool> arg_zmq_rpc_disabled = { "no-zmq" diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 96db8712b..99430b2b0 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -34,10 +34,12 @@ #include "misc_log_ex.h" #include "daemon/daemon.h" #include "rpc/daemon_handler.h" +#include "rpc/zmq_pub.h" #include "rpc/zmq_server.h" #include "common/password.h" #include "common/util.h" +#include "cryptonote_basic/events.h" #include "daemon/core.h" #include "daemon/p2p.h" #include "daemon/protocol.h" @@ -56,6 +58,17 @@ using namespace epee; namespace daemonize { +struct zmq_internals +{ + explicit zmq_internals(t_core& core, t_p2p& p2p) + : rpc_handler{core.get(), p2p.get()} + , server{rpc_handler} + {} + + cryptonote::rpc::DaemonHandler rpc_handler; + cryptonote::rpc::ZmqServer server; +}; + struct t_internals { private: t_protocol protocol; @@ -63,6 +76,7 @@ public: t_core core; t_p2p p2p; std::vector<std::unique_ptr<t_rpc>> rpcs; + std::unique_ptr<zmq_internals> zmq; t_internals( boost::program_options::variables_map const & vm @@ -70,6 +84,7 @@ public: : core{vm} , protocol{vm, core, command_line::get_arg(vm, cryptonote::arg_offline)} , p2p{vm, protocol} + , zmq{nullptr} { // Handle circular dependencies protocol.set_p2p_endpoint(p2p.get()); @@ -86,6 +101,28 @@ public: auto restricted_rpc_port = command_line::get_arg(vm, restricted_rpc_port_arg); rpcs.emplace_back(new t_rpc{vm, core, p2p, true, restricted_rpc_port, "restricted", true}); } + + if (!command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled)) + { + zmq.reset(new zmq_internals{core, p2p}); + + const std::string zmq_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port); + const std::string zmq_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip); + + if (!zmq->server.init_rpc(zmq_address, zmq_port)) + throw std::runtime_error{"Failed to add TCP socket(" + zmq_address + ":" + zmq_port + ") to ZMQ RPC Server"}; + + std::shared_ptr<cryptonote::listener::zmq_pub> shared; + const std::vector<std::string> zmq_pub = command_line::get_arg(vm, daemon_args::arg_zmq_pub); + if (!zmq_pub.empty() && !(shared = zmq->server.init_pub(epee::to_span(zmq_pub)))) + throw std::runtime_error{"Failed to initialize zmq_pub"}; + + if (shared) + { + core.get().get_blockchain_storage().add_block_notify(cryptonote::listener::zmq_pub::chain_main{shared}); + core.get().set_txpool_listener(cryptonote::listener::zmq_pub::txpool_add{shared}); + } + } } }; @@ -103,9 +140,6 @@ t_daemon::t_daemon( : mp_internals{new t_internals{vm}}, public_rpc_port(public_rpc_port) { - zmq_rpc_bind_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port); - zmq_rpc_bind_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip); - zmq_rpc_disabled = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled); } t_daemon::~t_daemon() = default; @@ -169,31 +203,8 @@ bool t_daemon::run(bool interactive) rpc_commands->start_handling(std::bind(&daemonize::t_daemon::stop_p2p, this)); } - cryptonote::rpc::DaemonHandler rpc_daemon_handler(mp_internals->core.get(), mp_internals->p2p.get()); - cryptonote::rpc::ZmqServer zmq_server(rpc_daemon_handler); - - if (!zmq_rpc_disabled) - { - if (!zmq_server.addTCPSocket(zmq_rpc_bind_address, zmq_rpc_bind_port)) - { - LOG_ERROR(std::string("Failed to add TCP Socket (") + zmq_rpc_bind_address - + ":" + zmq_rpc_bind_port + ") to ZMQ RPC Server"); - - if (rpc_commands) - rpc_commands->stop_handling(); - - for(auto& rpc : mp_internals->rpcs) - rpc->stop(); - - return false; - } - - MINFO("Starting ZMQ server..."); - zmq_server.run(); - - MINFO(std::string("ZMQ server started at ") + zmq_rpc_bind_address - + ":" + zmq_rpc_bind_port + "."); - } + if (mp_internals->zmq) + mp_internals->zmq->server.run(); else MINFO("ZMQ server disabled"); @@ -208,8 +219,8 @@ bool t_daemon::run(bool interactive) if (rpc_commands) rpc_commands->stop_handling(); - if (!zmq_rpc_disabled) - zmq_server.stop(); + if (mp_internals->zmq) + mp_internals->zmq->server.stop(); for(auto& rpc : mp_internals->rpcs) rpc->stop(); diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index bb7fdfebd..2eb2019ce 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -44,9 +44,6 @@ private: private: std::unique_ptr<t_internals> mp_internals; uint16_t public_rpc_port; - std::string zmq_rpc_bind_address; - std::string zmq_rpc_bind_port; - bool zmq_rpc_disabled; public: t_daemon( boost::program_options::variables_map const & vm, diff --git a/src/daemon/main.cpp b/src/daemon/main.cpp index dfc35470e..f2ae6dcc3 100644 --- a/src/daemon/main.cpp +++ b/src/daemon/main.cpp @@ -154,6 +154,7 @@ int main(int argc, char const * argv[]) command_line::add_arg(core_settings, daemon_args::arg_public_node); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_ip); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_port); + command_line::add_arg(core_settings, daemon_args::arg_zmq_pub); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_disabled); daemonizer::init_options(hidden_options, visible_options); |