aboutsummaryrefslogtreecommitdiff
path: root/src/daemon
diff options
context:
space:
mode:
Diffstat (limited to 'src/daemon')
-rw-r--r--src/daemon/command_line_args.h4
-rw-r--r--src/daemon/daemon.cpp71
-rw-r--r--src/daemon/daemon.h3
-rw-r--r--src/daemon/main.cpp1
4 files changed, 46 insertions, 33 deletions
diff --git a/src/daemon/command_line_args.h b/src/daemon/command_line_args.h
index 0ce987bcc..6c3e163e6 100644
--- a/src/daemon/command_line_args.h
+++ b/src/daemon/command_line_args.h
@@ -121,6 +121,10 @@ namespace daemon_args
return val;
}
};
+ const command_line::arg_descriptor<std::vector<std::string>> arg_zmq_pub = {
+ "zmq-pub"
+ , "Address for ZMQ pub - tcp://ip:port or ipc://path"
+ };
const command_line::arg_descriptor<bool> arg_zmq_rpc_disabled = {
"no-zmq"
diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp
index 96db8712b..99430b2b0 100644
--- a/src/daemon/daemon.cpp
+++ b/src/daemon/daemon.cpp
@@ -34,10 +34,12 @@
#include "misc_log_ex.h"
#include "daemon/daemon.h"
#include "rpc/daemon_handler.h"
+#include "rpc/zmq_pub.h"
#include "rpc/zmq_server.h"
#include "common/password.h"
#include "common/util.h"
+#include "cryptonote_basic/events.h"
#include "daemon/core.h"
#include "daemon/p2p.h"
#include "daemon/protocol.h"
@@ -56,6 +58,17 @@ using namespace epee;
namespace daemonize {
+struct zmq_internals
+{
+ explicit zmq_internals(t_core& core, t_p2p& p2p)
+ : rpc_handler{core.get(), p2p.get()}
+ , server{rpc_handler}
+ {}
+
+ cryptonote::rpc::DaemonHandler rpc_handler;
+ cryptonote::rpc::ZmqServer server;
+};
+
struct t_internals {
private:
t_protocol protocol;
@@ -63,6 +76,7 @@ public:
t_core core;
t_p2p p2p;
std::vector<std::unique_ptr<t_rpc>> rpcs;
+ std::unique_ptr<zmq_internals> zmq;
t_internals(
boost::program_options::variables_map const & vm
@@ -70,6 +84,7 @@ public:
: core{vm}
, protocol{vm, core, command_line::get_arg(vm, cryptonote::arg_offline)}
, p2p{vm, protocol}
+ , zmq{nullptr}
{
// Handle circular dependencies
protocol.set_p2p_endpoint(p2p.get());
@@ -86,6 +101,28 @@ public:
auto restricted_rpc_port = command_line::get_arg(vm, restricted_rpc_port_arg);
rpcs.emplace_back(new t_rpc{vm, core, p2p, true, restricted_rpc_port, "restricted", true});
}
+
+ if (!command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled))
+ {
+ zmq.reset(new zmq_internals{core, p2p});
+
+ const std::string zmq_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port);
+ const std::string zmq_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip);
+
+ if (!zmq->server.init_rpc(zmq_address, zmq_port))
+ throw std::runtime_error{"Failed to add TCP socket(" + zmq_address + ":" + zmq_port + ") to ZMQ RPC Server"};
+
+ std::shared_ptr<cryptonote::listener::zmq_pub> shared;
+ const std::vector<std::string> zmq_pub = command_line::get_arg(vm, daemon_args::arg_zmq_pub);
+ if (!zmq_pub.empty() && !(shared = zmq->server.init_pub(epee::to_span(zmq_pub))))
+ throw std::runtime_error{"Failed to initialize zmq_pub"};
+
+ if (shared)
+ {
+ core.get().get_blockchain_storage().add_block_notify(cryptonote::listener::zmq_pub::chain_main{shared});
+ core.get().set_txpool_listener(cryptonote::listener::zmq_pub::txpool_add{shared});
+ }
+ }
}
};
@@ -103,9 +140,6 @@ t_daemon::t_daemon(
: mp_internals{new t_internals{vm}},
public_rpc_port(public_rpc_port)
{
- zmq_rpc_bind_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port);
- zmq_rpc_bind_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip);
- zmq_rpc_disabled = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled);
}
t_daemon::~t_daemon() = default;
@@ -169,31 +203,8 @@ bool t_daemon::run(bool interactive)
rpc_commands->start_handling(std::bind(&daemonize::t_daemon::stop_p2p, this));
}
- cryptonote::rpc::DaemonHandler rpc_daemon_handler(mp_internals->core.get(), mp_internals->p2p.get());
- cryptonote::rpc::ZmqServer zmq_server(rpc_daemon_handler);
-
- if (!zmq_rpc_disabled)
- {
- if (!zmq_server.addTCPSocket(zmq_rpc_bind_address, zmq_rpc_bind_port))
- {
- LOG_ERROR(std::string("Failed to add TCP Socket (") + zmq_rpc_bind_address
- + ":" + zmq_rpc_bind_port + ") to ZMQ RPC Server");
-
- if (rpc_commands)
- rpc_commands->stop_handling();
-
- for(auto& rpc : mp_internals->rpcs)
- rpc->stop();
-
- return false;
- }
-
- MINFO("Starting ZMQ server...");
- zmq_server.run();
-
- MINFO(std::string("ZMQ server started at ") + zmq_rpc_bind_address
- + ":" + zmq_rpc_bind_port + ".");
- }
+ if (mp_internals->zmq)
+ mp_internals->zmq->server.run();
else
MINFO("ZMQ server disabled");
@@ -208,8 +219,8 @@ bool t_daemon::run(bool interactive)
if (rpc_commands)
rpc_commands->stop_handling();
- if (!zmq_rpc_disabled)
- zmq_server.stop();
+ if (mp_internals->zmq)
+ mp_internals->zmq->server.stop();
for(auto& rpc : mp_internals->rpcs)
rpc->stop();
diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h
index bb7fdfebd..2eb2019ce 100644
--- a/src/daemon/daemon.h
+++ b/src/daemon/daemon.h
@@ -44,9 +44,6 @@ private:
private:
std::unique_ptr<t_internals> mp_internals;
uint16_t public_rpc_port;
- std::string zmq_rpc_bind_address;
- std::string zmq_rpc_bind_port;
- bool zmq_rpc_disabled;
public:
t_daemon(
boost::program_options::variables_map const & vm,
diff --git a/src/daemon/main.cpp b/src/daemon/main.cpp
index dfc35470e..f2ae6dcc3 100644
--- a/src/daemon/main.cpp
+++ b/src/daemon/main.cpp
@@ -154,6 +154,7 @@ int main(int argc, char const * argv[])
command_line::add_arg(core_settings, daemon_args::arg_public_node);
command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_ip);
command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_port);
+ command_line::add_arg(core_settings, daemon_args::arg_zmq_pub);
command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_disabled);
daemonizer::init_options(hidden_options, visible_options);