diff options
35 files changed, 2228 insertions, 177 deletions
@@ -0,0 +1,61 @@ +# The Current/Future Status of ZMQ in Monero + +## ZMQ Pub/Sub +Client `ZMQ_SUB` sockets must "subscribe" to topics before it receives any data. +This allows filtering on the server side, so network traffic is reduced. Monero +allows for filtering on: (1) format, (2) context, and (3) event. + + * **format** refers to the _wire_ format (i.e. JSON) used to send event + information. + * **context** allows for a reduction in fields for the event, so the + daemon doesn't waste cycles serializing fields that get ignored. + * **event** refers to status changes occurring within the daemon (i.e. new + block to main chain). + + * Formats: + * `json` + * Contexts: + * `full` - the entire block or transaction is transmitted (the hash can be + computed remotely). + * `minimal` - the bare minimum for a remote client to react to an event is + sent. + * Events: + * `chain_main` - changes to the primary/main blockchain. + * `txpool_add` - new _publicly visible_ transactions in the mempool. + Includes previously unseen transactions in a block but _not_ the + `miner_tx`. Does not "re-publish" after a reorg. Includes `do_not_relay` + transactions. + +The subscription topics are formatted as `format-context-event`, with prefix +matching supported by both Monero and ZMQ. The `format`, `context` and `event` +will _never_ have hyphens or colons in their name. For example, subscribing to +`json-minimal-chain_main` will send minimal information in JSON when changes +to the main/primary blockchain occur. Whereas, subscribing to `json-minimal` +will send minimal information in JSON on all available events supported by the +daemon. + +The Monero daemon will ensure that events prefixed by `chain` will be sent in +"chain-order" - the `prev_id` (hash) field will _always_ refer to a previous +block. On rollbacks/reorgs, the event will reference an earlier block in the +chain instead of the last block. The Monero daemon also ensures that +`txpool_add` events are sent before `chain_*` events - the `chain_*` messages +will only serialize miner transactions since the other transactions were +previously published via `txpool_add`. This prevents transactions from being +serialized twice, even when the transaction was first observed in a block. + +ZMQ Pub/Sub will drop messages if the network is congested, so the above rules +for send order are used for detecting lost messages. A missing gap in `height` +or `prev_id` for `chain_*` events indicates a lost pub message. Missing +`txpool_add` messages can only be detected at the next `chain_` message. + +Since blockchain events can be dropped, clients will likely want to have a +timeout against `chain_main` events. The `GetLastBlockHeader` RPC is useful +for checking the current chain state. Dropped messages should be rare in most +conditions. + +The Monero daemon will send a `txpool_add` pub exactly once for each +transaction, even after a reorg or restarts. Clients should use the +`GetTransactionPool` after a reorg to get all transactions that have been put +back into the tx pool or been invalidated due to a double-spend. + + diff --git a/src/blockchain_db/blockchain_db.cpp b/src/blockchain_db/blockchain_db.cpp index 9e977b1b9..5c8dece2a 100644 --- a/src/blockchain_db/blockchain_db.cpp +++ b/src/blockchain_db/blockchain_db.cpp @@ -63,6 +63,7 @@ bool matches_category(relay_method method, relay_category category) noexcept { default: case relay_method::local: + case relay_method::forward: case relay_method::stem: return false; case relay_method::block: @@ -79,6 +80,7 @@ void txpool_tx_meta_t::set_relay_method(relay_method method) noexcept kept_by_block = 0; do_not_relay = 0; is_local = 0; + is_forwarding = 0; dandelionpp_stem = 0; switch (method) @@ -89,8 +91,8 @@ void txpool_tx_meta_t::set_relay_method(relay_method method) noexcept case relay_method::local: is_local = 1; break; - default: - case relay_method::fluff: + case relay_method::forward: + is_forwarding = 1; break; case relay_method::stem: dandelionpp_stem = 1; @@ -98,26 +100,45 @@ void txpool_tx_meta_t::set_relay_method(relay_method method) noexcept case relay_method::block: kept_by_block = 1; break; + default: + case relay_method::fluff: + break; } } relay_method txpool_tx_meta_t::get_relay_method() const noexcept { - if (kept_by_block) - return relay_method::block; - if (do_not_relay) - return relay_method::none; - if (is_local) - return relay_method::local; - if (dandelionpp_stem) - return relay_method::stem; + const uint8_t state = + uint8_t(kept_by_block) + + (uint8_t(do_not_relay) << 1) + + (uint8_t(is_local) << 2) + + (uint8_t(is_forwarding) << 3) + + (uint8_t(dandelionpp_stem) << 4); + + switch (state) + { + default: // error case + case 0: + break; + case 1: + return relay_method::block; + case 2: + return relay_method::none; + case 4: + return relay_method::local; + case 8: + return relay_method::forward; + case 16: + return relay_method::stem; + }; return relay_method::fluff; } bool txpool_tx_meta_t::upgrade_relay_method(relay_method method) noexcept { static_assert(relay_method::none < relay_method::local, "bad relay_method value"); - static_assert(relay_method::local < relay_method::stem, "bad relay_method value"); + static_assert(relay_method::local < relay_method::forward, "bad relay_method value"); + static_assert(relay_method::forward < relay_method::stem, "bad relay_method value"); static_assert(relay_method::stem < relay_method::fluff, "bad relay_method value"); static_assert(relay_method::fluff < relay_method::block, "bad relay_method value"); diff --git a/src/blockchain_db/blockchain_db.h b/src/blockchain_db/blockchain_db.h index f513651ed..9a321437b 100644 --- a/src/blockchain_db/blockchain_db.h +++ b/src/blockchain_db/blockchain_db.h @@ -160,7 +160,7 @@ struct txpool_tx_meta_t uint64_t max_used_block_height; uint64_t last_failed_height; uint64_t receive_time; - uint64_t last_relayed_time; //!< If Dandelion++ stem, randomized embargo timestamp. Otherwise, last relayed timestmap. + uint64_t last_relayed_time; //!< If received over i2p/tor, randomized forward time. If Dandelion++stem, randomized embargo time. Otherwise, last relayed timestamp // 112 bytes uint8_t kept_by_block; uint8_t relayed; @@ -169,7 +169,8 @@ struct txpool_tx_meta_t uint8_t pruned: 1; uint8_t is_local: 1; uint8_t dandelionpp_stem : 1; - uint8_t bf_padding: 4; + uint8_t is_forwarding: 1; + uint8_t bf_padding: 3; uint8_t padding[76]; // till 192 bytes diff --git a/src/common/notify.cpp b/src/common/notify.cpp index e2df5096d..f31100214 100644 --- a/src/common/notify.cpp +++ b/src/common/notify.cpp @@ -62,7 +62,7 @@ static void replace(std::vector<std::string> &v, const char *tag, const char *s) boost::replace_all(str, tag, s); } -int Notify::notify(const char *tag, const char *s, ...) +int Notify::notify(const char *tag, const char *s, ...) const { std::vector<std::string> margs = args; diff --git a/src/common/notify.h b/src/common/notify.h index f813e8def..65d4e1072 100644 --- a/src/common/notify.h +++ b/src/common/notify.h @@ -38,8 +38,12 @@ class Notify { public: Notify(const char *spec); + Notify(const Notify&) = default; + Notify(Notify&&) = default; + Notify& operator=(const Notify&) = default; + Notify& operator=(Notify&&) = default; - int notify(const char *tag, const char *s, ...); + int notify(const char *tag, const char *s, ...) const; private: std::string filename; @@ -47,3 +51,4 @@ private: }; } + diff --git a/src/cryptonote_basic/events.h b/src/cryptonote_basic/events.h new file mode 100644 index 000000000..6c6742215 --- /dev/null +++ b/src/cryptonote_basic/events.h @@ -0,0 +1,46 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include "crypto/hash.h" +#include "cryptonote_basic/cryptonote_basic.h" + +namespace cryptonote +{ + /*! Transactions are expensive to move or copy (lots of 32-byte internal + buffers). This allows `cryptonote::core` to do a single notification for + a vector of transactions, without having to move/copy duplicate or invalid + transactions. */ + struct txpool_event + { + cryptonote::transaction tx; + crypto::hash hash; + bool res; //!< Listeners must ignore `tx` when this is false. + }; +} diff --git a/src/cryptonote_basic/fwd.h b/src/cryptonote_basic/fwd.h new file mode 100644 index 000000000..d54223461 --- /dev/null +++ b/src/cryptonote_basic/fwd.h @@ -0,0 +1,36 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +namespace cryptonote +{ + struct block; + class transaction; + struct txpool_event; +} diff --git a/src/cryptonote_config.h b/src/cryptonote_config.h index 7dc883c90..8c4e61d4d 100644 --- a/src/cryptonote_config.h +++ b/src/cryptonote_config.h @@ -117,6 +117,11 @@ #define CRYPTONOTE_NOISE_BYTES 3*1024 // 3 KiB #define CRYPTONOTE_NOISE_CHANNELS 2 // Max outgoing connections per zone used for noise/covert sending +// Both below are in seconds. The idea is to delay forwarding from i2p/tor +// to ipv4/6, such that 2+ incoming connections _could_ have sent the tx +#define CRYPTONOTE_FORWARD_DELAY_BASE (CRYPTONOTE_NOISE_MIN_DELAY + CRYPTONOTE_NOISE_DELAY_RANGE) +#define CRYPTONOTE_FORWARD_DELAY_AVERAGE (CRYPTONOTE_FORWARD_DELAY_BASE + (CRYPTONOTE_FORWARD_DELAY_BASE / 2)) + #define CRYPTONOTE_MAX_FRAGMENTS 20 // ~20 * NOISE_BYTES max payload size for covert/noise send #define COMMAND_RPC_GET_BLOCKS_FAST_MAX_COUNT 1000 diff --git a/src/cryptonote_core/blockchain.cpp b/src/cryptonote_core/blockchain.cpp index 7851b0f6a..b0726981c 100644 --- a/src/cryptonote_core/blockchain.cpp +++ b/src/cryptonote_core/blockchain.cpp @@ -1234,10 +1234,15 @@ bool Blockchain::switch_to_alternative_blockchain(std::list<block_extended_info> reorg_notify->notify("%s", std::to_string(split_height).c_str(), "%h", std::to_string(m_db->height()).c_str(), "%n", std::to_string(m_db->height() - split_height).c_str(), "%d", std::to_string(discarded_blocks).c_str(), NULL); - std::shared_ptr<tools::Notify> block_notify = m_block_notify; - if (block_notify) - for (const auto &bei: alt_chain) - block_notify->notify("%s", epee::string_tools::pod_to_hex(get_block_hash(bei.bl)).c_str(), NULL); + for (const auto& notifier : m_block_notifiers) + { + std::size_t notify_height = split_height; + for (const auto& bei: alt_chain) + { + notifier(notify_height, {std::addressof(bei.bl), 1}); + ++notify_height; + } + } MGINFO_GREEN("REORGANIZE SUCCESS! on height: " << split_height << ", new blockchain size: " << m_db->height()); return true; @@ -4236,12 +4241,9 @@ leave: get_difficulty_for_next_block(); // just to cache it invalidate_block_template_cache(); - if (notify) - { - std::shared_ptr<tools::Notify> block_notify = m_block_notify; - if (block_notify) - block_notify->notify("%s", epee::string_tools::pod_to_hex(id).c_str(), NULL); - } + + for (const auto& notifier: m_block_notifiers) + notifier(new_height - 1, {std::addressof(bl), 1}); return true; } @@ -5132,6 +5134,15 @@ void Blockchain::set_user_options(uint64_t maxthreads, bool sync_on_blocks, uint m_max_prepare_blocks_threads = maxthreads; } +void Blockchain::add_block_notify(boost::function<void(std::uint64_t, epee::span<const block>)>&& notify) +{ + if (notify) + { + CRITICAL_REGION_LOCAL(m_blockchain_lock); + m_block_notifiers.push_back(std::move(notify)); + } +} + void Blockchain::safesyncmode(const bool onoff) { /* all of this is no-op'd if the user set a specific diff --git a/src/cryptonote_core/blockchain.h b/src/cryptonote_core/blockchain.h index fb7e5c4f8..703dd6400 100644 --- a/src/cryptonote_core/blockchain.h +++ b/src/cryptonote_core/blockchain.h @@ -30,6 +30,7 @@ #pragma once #include <boost/asio/io_service.hpp> +#include <boost/function/function_fwd.hpp> #include <boost/serialization/serialization.hpp> #include <boost/serialization/version.hpp> #include <boost/serialization/list.hpp> @@ -764,7 +765,7 @@ namespace cryptonote * * @param notify the notify object to call at every new block */ - void set_block_notify(const std::shared_ptr<tools::Notify> ¬ify) { m_block_notify = notify; } + void add_block_notify(boost::function<void(std::uint64_t, epee::span<const block>)> &¬ify); /** * @brief sets a reorg notify object to call for every reorg @@ -1125,7 +1126,11 @@ namespace cryptonote bool m_batch_success; - std::shared_ptr<tools::Notify> m_block_notify; + /* `boost::function` is used because the implementation never allocates if + the callable object has a single `std::shared_ptr` or `std::weap_ptr` + internally. Whereas, the libstdc++ `std::function` will allocate. */ + + std::vector<boost::function<void(std::uint64_t, epee::span<const block>)>> m_block_notifiers; std::shared_ptr<tools::Notify> m_reorg_notify; // for prepare_handle_incoming_blocks diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index 141e54459..8e30d6676 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -41,6 +41,7 @@ using namespace epee; #include "common/download.h" #include "common/threadpool.h" #include "common/command_line.h" +#include "cryptonote_basic/events.h" #include "warnings.h" #include "crypto/crypto.h" #include "cryptonote_config.h" @@ -51,6 +52,7 @@ using namespace epee; #include "ringct/rctTypes.h" #include "blockchain_db/blockchain_db.h" #include "ringct/rctSigs.h" +#include "rpc/zmq_pub.h" #include "common/notify.h" #include "hardforks/hardforks.h" #include "version.h" @@ -262,6 +264,13 @@ namespace cryptonote { m_blockchain_storage.set_enforce_dns_checkpoints(enforce_dns); } + //----------------------------------------------------------------------------------- + void core::set_txpool_listener(boost::function<void(std::vector<txpool_event>)> zmq_pub) + { + CRITICAL_REGION_LOCAL(m_incoming_tx_lock); + m_zmq_pub = std::move(zmq_pub); + } + //----------------------------------------------------------------------------------------------- bool core::update_checkpoints(const bool skip_dns /* = false */) { @@ -614,7 +623,20 @@ namespace cryptonote try { if (!command_line::is_arg_defaulted(vm, arg_block_notify)) - m_blockchain_storage.set_block_notify(std::shared_ptr<tools::Notify>(new tools::Notify(command_line::get_arg(vm, arg_block_notify).c_str()))); + { + struct hash_notify + { + tools::Notify cmdline; + + void operator()(std::uint64_t, epee::span<const block> blocks) const + { + for (const block bl : blocks) + cmdline.notify("%s", epee::string_tools::pod_to_hex(get_block_hash(bl)).c_str(), NULL); + } + }; + + m_blockchain_storage.add_block_notify(hash_notify{{command_line::get_arg(vm, arg_block_notify).c_str()}}); + } } catch (const std::exception &e) { @@ -957,8 +979,7 @@ namespace cryptonote return false; } - struct result { bool res; cryptonote::transaction tx; crypto::hash hash; }; - std::vector<result> results(tx_blobs.size()); + std::vector<txpool_event> results(tx_blobs.size()); CRITICAL_REGION_LOCAL(m_incoming_tx_lock); @@ -1023,6 +1044,7 @@ namespace cryptonote if (!tx_info.empty()) handle_incoming_tx_accumulated_batch(tx_info, tx_relay == relay_method::block); + bool valid_events = false; bool ok = true; it = tx_blobs.begin(); for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { @@ -1045,10 +1067,18 @@ namespace cryptonote {MERROR_VER("Transaction verification impossible: " << results[i].hash);} if(tvc[i].m_added_to_pool) + { MDEBUG("tx added: " << results[i].hash); + valid_events = true; + } + else + results[i].res = false; } - return ok; + if (valid_events && m_zmq_pub && matches_category(tx_relay, relay_category::legacy)) + m_zmq_pub(std::move(results)); + + return ok; CATCH_ENTRY_L0("core::handle_incoming_txs()", false); } //----------------------------------------------------------------------------------------------- @@ -1273,6 +1303,7 @@ namespace cryptonote { NOTIFY_NEW_TRANSACTIONS::request public_req{}; NOTIFY_NEW_TRANSACTIONS::request private_req{}; + NOTIFY_NEW_TRANSACTIONS::request stem_req{}; for (auto& tx : txs) { switch (std::get<2>(tx)) @@ -1283,6 +1314,9 @@ namespace cryptonote case relay_method::local: private_req.txs.push_back(std::move(std::get<1>(tx))); break; + case relay_method::forward: + stem_req.txs.push_back(std::move(std::get<1>(tx))); + break; case relay_method::block: case relay_method::fluff: case relay_method::stem: @@ -1300,6 +1334,8 @@ namespace cryptonote get_protocol()->relay_transactions(public_req, source, epee::net_utils::zone::public_, relay_method::fluff); if (!private_req.txs.empty()) get_protocol()->relay_transactions(private_req, source, epee::net_utils::zone::invalid, relay_method::local); + if (!stem_req.txs.empty()) + get_protocol()->relay_transactions(stem_req, source, epee::net_utils::zone::public_, relay_method::stem); } return true; } diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index 6a9ffda92..a53596c2c 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -32,9 +32,11 @@ #include <ctime> +#include <boost/function.hpp> #include <boost/program_options/options_description.hpp> #include <boost/program_options/variables_map.hpp> +#include "cryptonote_basic/fwd.h" #include "cryptonote_core/i_core_events.h" #include "cryptonote_protocol/cryptonote_protocol_handler_common.h" #include "cryptonote_protocol/enums.h" @@ -48,6 +50,7 @@ #include "warnings.h" #include "crypto/hash.h" #include "span.h" +#include "rpc/fwd.h" PUSH_WARNINGS DISABLE_VS_WARNINGS(4355) @@ -446,6 +449,13 @@ namespace cryptonote void set_enforce_dns_checkpoints(bool enforce_dns); /** + * @brief set a listener for txes being added to the txpool + * + * @param callable to notify, or empty function to disable. + */ + void set_txpool_listener(boost::function<void(std::vector<txpool_event>)> zmq_pub); + + /** * @brief set whether or not to enable or disable DNS checkpoints * * @param disble whether to disable DNS checkpoints @@ -1098,7 +1108,12 @@ namespace cryptonote bool m_fluffy_blocks_enabled; bool m_offline; + /* `boost::function` is used because the implementation never allocates if + the callable object has a single `std::shared_ptr` or `std::weap_ptr` + internally. Whereas, the libstdc++ `std::function` will allocate. */ + std::shared_ptr<tools::Notify> m_block_rate_notify; + boost::function<void(std::vector<txpool_event>)> m_zmq_pub; }; } diff --git a/src/cryptonote_core/tx_pool.cpp b/src/cryptonote_core/tx_pool.cpp index 74aab88c4..7cb0e4062 100644 --- a/src/cryptonote_core/tx_pool.cpp +++ b/src/cryptonote_core/tx_pool.cpp @@ -91,6 +91,8 @@ namespace cryptonote time_t const MAX_RELAY_TIME = (60 * 60 * 4); // at most that many seconds between resends float const ACCEPT_THRESHOLD = 1.0f; + constexpr const std::chrono::seconds forward_delay_average{CRYPTONOTE_FORWARD_DELAY_AVERAGE}; + // a kind of increasing backoff within min/max bounds uint64_t get_relay_delay(time_t now, time_t received) { @@ -309,8 +311,14 @@ namespace cryptonote if (meta.upgrade_relay_method(tx_relay) || !existing_tx) // synchronize with embargo timer or stem/fluff out-of-order messages { + using clock = std::chrono::system_clock; + auto last_relayed_time = std::numeric_limits<decltype(meta.last_relayed_time)>::max(); + if (tx_relay == relay_method::forward) + last_relayed_time = clock::to_time_t(clock::now() + crypto::random_poisson_seconds{forward_delay_average}()); + // else the `set_relayed` function will adjust the time accordingly later + //update transactions container - meta.last_relayed_time = std::numeric_limits<decltype(meta.last_relayed_time)>::max(); + meta.last_relayed_time = last_relayed_time; meta.receive_time = receive_time; meta.weight = tx_weight; meta.fee = fee; @@ -341,7 +349,7 @@ namespace cryptonote tvc.m_added_to_pool = true; static_assert(unsigned(relay_method::none) == 0, "expected relay_method::none value to be zero"); - if(meta.fee > 0) + if(meta.fee > 0 && tx_relay != relay_method::forward) tvc.m_relay = tx_relay; } @@ -722,28 +730,46 @@ namespace cryptonote //TODO: investigate whether boolean return is appropriate bool tx_memory_pool::get_relayable_transactions(std::vector<std::tuple<crypto::hash, cryptonote::blobdata, relay_method>> &txs) const { + std::vector<std::pair<crypto::hash, txpool_tx_meta_t>> change_timestamps; + const uint64_t now = time(NULL); + CRITICAL_REGION_LOCAL(m_transactions_lock); CRITICAL_REGION_LOCAL1(m_blockchain); - const uint64_t now = time(NULL); + LockedTXN lock(m_blockchain.get_db()); txs.reserve(m_blockchain.get_txpool_tx_count()); - m_blockchain.for_all_txpool_txes([this, now, &txs](const crypto::hash &txid, const txpool_tx_meta_t &meta, const cryptonote::blobdata *){ + m_blockchain.for_all_txpool_txes([this, now, &txs, &change_timestamps](const crypto::hash &txid, const txpool_tx_meta_t &meta, const cryptonote::blobdata *){ // 0 fee transactions are never relayed if(!meta.pruned && meta.fee > 0 && !meta.do_not_relay) { - if (!meta.dandelionpp_stem && now - meta.last_relayed_time <= get_relay_delay(now, meta.receive_time)) - return true; - if (meta.dandelionpp_stem && meta.last_relayed_time < now) // for dandelion++ stem, this value is the embargo timeout - return true; + const relay_method tx_relay = meta.get_relay_method(); + switch (tx_relay) + { + case relay_method::stem: + case relay_method::forward: + if (meta.last_relayed_time > now) + return true; // continue to next tx + change_timestamps.emplace_back(txid, meta); + break; + default: + case relay_method::none: + return true; + case relay_method::local: + case relay_method::fluff: + case relay_method::block: + if (now - meta.last_relayed_time <= get_relay_delay(now, meta.receive_time)) + return true; // continue to next tx + break; + } // if the tx is older than half the max lifetime, we don't re-relay it, to avoid a problem // mentioned by smooth where nodes would flush txes at slightly different times, causing // flushed txes to be re-added when received from a node which was just about to flush it - uint64_t max_age = meta.kept_by_block ? CRYPTONOTE_MEMPOOL_TX_FROM_ALT_BLOCK_LIVETIME : CRYPTONOTE_MEMPOOL_TX_LIVETIME; + uint64_t max_age = (tx_relay == relay_method::block) ? CRYPTONOTE_MEMPOOL_TX_FROM_ALT_BLOCK_LIVETIME : CRYPTONOTE_MEMPOOL_TX_LIVETIME; if (now - meta.receive_time <= max_age / 2) { try { - txs.emplace_back(txid, m_blockchain.get_txpool_tx_blob(txid, relay_category::all), meta.get_relay_method()); + txs.emplace_back(txid, m_blockchain.get_txpool_tx_blob(txid, relay_category::all), tx_relay); } catch (const std::exception &e) { @@ -754,6 +780,18 @@ namespace cryptonote } return true; }, false, relay_category::relayable); + + for (auto& elem : change_timestamps) + { + /* These transactions are still in forward or stem state, so the field + represents the next time a relay should be attempted. Will be + overwritten when the state is upgraded to stem, fluff or block. This + function is only called every ~2 minutes, so this resetting should be + unnecessary, but is primarily a precaution against potential changes + to the callback routines. */ + elem.second.last_relayed_time = now + get_relay_delay(now, elem.second.receive_time); + m_blockchain.update_txpool_tx(elem.first, elem.second); + } return true; } //--------------------------------------------------------------------------------- diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl index 02c416af5..bd14fe0a7 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl @@ -935,7 +935,19 @@ namespace cryptonote return 1; } - relay_method tx_relay; + /* If the txes were received over i2p/tor, the default is to "forward" + with a randomized delay to further enhance the "white noise" behavior, + potentially making it harder for ISP-level spies to determine which + inbound link sent the tx. If the sender disabled "white noise" over + i2p/tor, then the sender is "fluffing" (to only outbound) i2p/tor + connections with the `dandelionpp_fluff` flag set. The receiver (hidden + service) will immediately fluff in that scenario (i.e. this assumes that a + sybil spy will be unable to link an IP to an i2p/tor connection). */ + + const epee::net_utils::zone zone = context.m_remote_address.get_zone(); + relay_method tx_relay = zone == epee::net_utils::zone::public_ ? + relay_method::stem : relay_method::forward; + std::vector<blobdata> stem_txs{}; std::vector<blobdata> fluff_txs{}; if (arg.dandelionpp_fluff) @@ -944,10 +956,7 @@ namespace cryptonote fluff_txs.reserve(arg.txs.size()); } else - { - tx_relay = relay_method::stem; stem_txs.reserve(arg.txs.size()); - } for (auto& tx : arg.txs) { @@ -970,6 +979,7 @@ namespace cryptonote fluff_txs.push_back(std::move(tx)); break; default: + case relay_method::forward: // not supposed to happen here case relay_method::none: break; } diff --git a/src/cryptonote_protocol/enums.h b/src/cryptonote_protocol/enums.h index fabb82c61..c0c495837 100644 --- a/src/cryptonote_protocol/enums.h +++ b/src/cryptonote_protocol/enums.h @@ -37,6 +37,7 @@ namespace cryptonote { none = 0, //!< Received via RPC with `do_not_relay` set local, //!< Received via RPC; trying to send over i2p/tor, etc. + forward, //!< Received over i2p/tor; timer delayed before ipv4/6 public broadcast stem, //!< Received/send over network using Dandelion++ stem fluff, //!< Received/sent over network using Dandelion++ fluff block //!< Received in block, takes precedence over others diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index 56181a59b..7c482156f 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -357,11 +357,15 @@ namespace levin return true; }); - // Always send txs in stem mode over i2p/tor, see comments in `send_txs` below. + /* Always send with `fluff` flag, even over i2p/tor. The hidden service + will disable the forwarding delay and immediately fluff. The i2p/tor + network is therefore replacing the sybil protection of Dandelion++. + Dandelion++ stem phase over i2p/tor is also worth investigating + (with/without "noise"?). */ for (auto& connection : connections) { std::sort(connection.first.begin(), connection.first.end()); // don't leak receive order - make_payload_send_txs(*zone_->p2p, std::move(connection.first), connection.second, zone_->pad_txs, zone_->is_public); + make_payload_send_txs(*zone_->p2p, std::move(connection.first), connection.second, zone_->pad_txs, true); } if (next_flush != std::chrono::steady_clock::time_point::max()) @@ -811,12 +815,11 @@ namespace levin case relay_method::block: return false; case relay_method::stem: - tx_relay = relay_method::fluff; // don't set stempool embargo when skipping to fluff - /* fallthrough */ + case relay_method::forward: case relay_method::local: if (zone_->is_public) { - // this will change a local tx to stem or fluff ... + // this will change a local/forward tx to stem or fluff ... zone_->strand.dispatch( dandelionpp_notify{zone_, std::addressof(core), std::move(txs), source} ); @@ -824,6 +827,11 @@ namespace levin } /* fallthrough */ case relay_method::fluff: + /* If sending stem/forward/local txes over non public networks, + continue to claim that relay mode even though it used the "fluff" + routine. A "fluff" over i2p/tor is not the same as a "fluff" over + ipv4/6. Marking it as "fluff" here will make the tx immediately + visible externally from this node, which is not desired. */ core.on_transactions_relayed(epee::to_span(txs), tx_relay); zone_->strand.dispatch(fluff_notify{zone_, std::move(txs), source}); break; diff --git a/src/daemon/command_line_args.h b/src/daemon/command_line_args.h index 0ce987bcc..6c3e163e6 100644 --- a/src/daemon/command_line_args.h +++ b/src/daemon/command_line_args.h @@ -121,6 +121,10 @@ namespace daemon_args return val; } }; + const command_line::arg_descriptor<std::vector<std::string>> arg_zmq_pub = { + "zmq-pub" + , "Address for ZMQ pub - tcp://ip:port or ipc://path" + }; const command_line::arg_descriptor<bool> arg_zmq_rpc_disabled = { "no-zmq" diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 96db8712b..99430b2b0 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -34,10 +34,12 @@ #include "misc_log_ex.h" #include "daemon/daemon.h" #include "rpc/daemon_handler.h" +#include "rpc/zmq_pub.h" #include "rpc/zmq_server.h" #include "common/password.h" #include "common/util.h" +#include "cryptonote_basic/events.h" #include "daemon/core.h" #include "daemon/p2p.h" #include "daemon/protocol.h" @@ -56,6 +58,17 @@ using namespace epee; namespace daemonize { +struct zmq_internals +{ + explicit zmq_internals(t_core& core, t_p2p& p2p) + : rpc_handler{core.get(), p2p.get()} + , server{rpc_handler} + {} + + cryptonote::rpc::DaemonHandler rpc_handler; + cryptonote::rpc::ZmqServer server; +}; + struct t_internals { private: t_protocol protocol; @@ -63,6 +76,7 @@ public: t_core core; t_p2p p2p; std::vector<std::unique_ptr<t_rpc>> rpcs; + std::unique_ptr<zmq_internals> zmq; t_internals( boost::program_options::variables_map const & vm @@ -70,6 +84,7 @@ public: : core{vm} , protocol{vm, core, command_line::get_arg(vm, cryptonote::arg_offline)} , p2p{vm, protocol} + , zmq{nullptr} { // Handle circular dependencies protocol.set_p2p_endpoint(p2p.get()); @@ -86,6 +101,28 @@ public: auto restricted_rpc_port = command_line::get_arg(vm, restricted_rpc_port_arg); rpcs.emplace_back(new t_rpc{vm, core, p2p, true, restricted_rpc_port, "restricted", true}); } + + if (!command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled)) + { + zmq.reset(new zmq_internals{core, p2p}); + + const std::string zmq_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port); + const std::string zmq_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip); + + if (!zmq->server.init_rpc(zmq_address, zmq_port)) + throw std::runtime_error{"Failed to add TCP socket(" + zmq_address + ":" + zmq_port + ") to ZMQ RPC Server"}; + + std::shared_ptr<cryptonote::listener::zmq_pub> shared; + const std::vector<std::string> zmq_pub = command_line::get_arg(vm, daemon_args::arg_zmq_pub); + if (!zmq_pub.empty() && !(shared = zmq->server.init_pub(epee::to_span(zmq_pub)))) + throw std::runtime_error{"Failed to initialize zmq_pub"}; + + if (shared) + { + core.get().get_blockchain_storage().add_block_notify(cryptonote::listener::zmq_pub::chain_main{shared}); + core.get().set_txpool_listener(cryptonote::listener::zmq_pub::txpool_add{shared}); + } + } } }; @@ -103,9 +140,6 @@ t_daemon::t_daemon( : mp_internals{new t_internals{vm}}, public_rpc_port(public_rpc_port) { - zmq_rpc_bind_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port); - zmq_rpc_bind_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip); - zmq_rpc_disabled = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled); } t_daemon::~t_daemon() = default; @@ -169,31 +203,8 @@ bool t_daemon::run(bool interactive) rpc_commands->start_handling(std::bind(&daemonize::t_daemon::stop_p2p, this)); } - cryptonote::rpc::DaemonHandler rpc_daemon_handler(mp_internals->core.get(), mp_internals->p2p.get()); - cryptonote::rpc::ZmqServer zmq_server(rpc_daemon_handler); - - if (!zmq_rpc_disabled) - { - if (!zmq_server.addTCPSocket(zmq_rpc_bind_address, zmq_rpc_bind_port)) - { - LOG_ERROR(std::string("Failed to add TCP Socket (") + zmq_rpc_bind_address - + ":" + zmq_rpc_bind_port + ") to ZMQ RPC Server"); - - if (rpc_commands) - rpc_commands->stop_handling(); - - for(auto& rpc : mp_internals->rpcs) - rpc->stop(); - - return false; - } - - MINFO("Starting ZMQ server..."); - zmq_server.run(); - - MINFO(std::string("ZMQ server started at ") + zmq_rpc_bind_address - + ":" + zmq_rpc_bind_port + "."); - } + if (mp_internals->zmq) + mp_internals->zmq->server.run(); else MINFO("ZMQ server disabled"); @@ -208,8 +219,8 @@ bool t_daemon::run(bool interactive) if (rpc_commands) rpc_commands->stop_handling(); - if (!zmq_rpc_disabled) - zmq_server.stop(); + if (mp_internals->zmq) + mp_internals->zmq->server.stop(); for(auto& rpc : mp_internals->rpcs) rpc->stop(); diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index bb7fdfebd..2eb2019ce 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -44,9 +44,6 @@ private: private: std::unique_ptr<t_internals> mp_internals; uint16_t public_rpc_port; - std::string zmq_rpc_bind_address; - std::string zmq_rpc_bind_port; - bool zmq_rpc_disabled; public: t_daemon( boost::program_options::variables_map const & vm, diff --git a/src/daemon/main.cpp b/src/daemon/main.cpp index dfc35470e..f2ae6dcc3 100644 --- a/src/daemon/main.cpp +++ b/src/daemon/main.cpp @@ -154,6 +154,7 @@ int main(int argc, char const * argv[]) command_line::add_arg(core_settings, daemon_args::arg_public_node); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_ip); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_port); + command_line::add_arg(core_settings, daemon_args::arg_zmq_pub); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_disabled); daemonizer::init_options(hidden_options, visible_options); diff --git a/src/net/zmq.cpp b/src/net/zmq.cpp index 1a0edb4b9..15560ca7e 100644 --- a/src/net/zmq.cpp +++ b/src/net/zmq.cpp @@ -158,20 +158,6 @@ namespace zmq return unsigned(max_out) < added ? max_out : int(added); } }; - - template<typename F, typename... T> - expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...))) - { - for (;;) - { - if (0 <= op(args...)) - return success(); - - const int error = zmq_errno(); - if (error != EINTR) - return make_error_code(error); - } - } } // anonymous expect<std::string> receive(void* const socket, const int flags) diff --git a/src/net/zmq.h b/src/net/zmq.h index 8c587ed7c..fa4ef2fc9 100644 --- a/src/net/zmq.h +++ b/src/net/zmq.h @@ -26,6 +26,8 @@ // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + #include <memory> #include <string> #include <system_error> @@ -105,6 +107,26 @@ namespace zmq //! Unique ZMQ socket handle, calls `zmq_close` on destruction. using socket = std::unique_ptr<void, close>; + /*! Retry a ZMQ function on `EINTR` errors. `F` must return an int with + values less than 0 on error. + + \param op The ZMQ function to execute + retry + \param args Forwarded to `op`. Must be resuable in case of retry. + \return All errors except for `EINTR`. */ + template<typename F, typename... T> + expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...))) + { + for (;;) + { + if (0 <= op(args...)) + return success(); + + const int error = zmq_errno(); + if (error != EINTR) + return make_error_code(error); + } + } + /*! Read all parts of the next message on `socket`. Blocks until the entire next message (all parts) are read, or until `zmq_term` is called on the `zmq_context` associated with `socket`. If the context is terminated, diff --git a/src/p2p/net_node.inl b/src/p2p/net_node.inl index fb3a38b07..175741146 100644 --- a/src/p2p/net_node.inl +++ b/src/p2p/net_node.inl @@ -604,16 +604,12 @@ namespace nodetool if (nettype == cryptonote::TESTNET) { full_addrs.insert("212.83.175.67:28080"); - full_addrs.insert("5.9.100.248:28080"); - full_addrs.insert("163.172.182.165:28080"); - full_addrs.insert("195.154.123.123:28080"); full_addrs.insert("212.83.172.165:28080"); full_addrs.insert("192.110.160.146:28080"); } else if (nettype == cryptonote::STAGENET) { full_addrs.insert("162.210.173.150:38080"); - full_addrs.insert("162.210.173.151:38080"); full_addrs.insert("192.110.160.146:38080"); } else if (nettype == cryptonote::FAKECHAIN) @@ -621,13 +617,7 @@ namespace nodetool } else { - full_addrs.insert("107.152.130.98:18080"); full_addrs.insert("212.83.175.67:18080"); - full_addrs.insert("5.9.100.248:18080"); - full_addrs.insert("163.172.182.165:18080"); - full_addrs.insert("161.67.132.39:18080"); - full_addrs.insert("198.74.231.92:18080"); - full_addrs.insert("195.154.123.123:18080"); full_addrs.insert("212.83.172.165:18080"); full_addrs.insert("192.110.160.146:18080"); full_addrs.insert("88.198.163.90:18080"); diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 35195bd98..19298c969 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -45,8 +45,11 @@ set(daemon_messages_sources message.cpp daemon_messages.cpp) +set(rpc_pub_sources zmq_pub.cpp) + set(daemon_rpc_server_sources daemon_handler.cpp + zmq_pub.cpp zmq_server.cpp) @@ -59,8 +62,9 @@ set(rpc_headers rpc_version_str.h rpc_handler.h) -set(daemon_rpc_server_headers) +set(rpc_pub_headers zmq_pub.h) +set(daemon_rpc_server_headers) set(rpc_daemon_private_headers bootstrap_daemon.h @@ -83,6 +87,8 @@ set(daemon_rpc_server_private_headers monero_private_headers(rpc ${rpc_private_headers}) +set(rpc_pub_private_headers) + monero_private_headers(daemon_rpc_server ${daemon_rpc_server_private_headers}) @@ -97,6 +103,11 @@ monero_add_library(rpc ${rpc_headers} ${rpc_private_headers}) +monero_add_library(rpc_pub + ${rpc_pub_sources} + ${rpc_pub_headers} + ${rpc_pub_private_headers}) + monero_add_library(daemon_messages ${daemon_messages_sources} ${daemon_messages_headers} @@ -131,6 +142,14 @@ target_link_libraries(rpc PRIVATE ${EXTRA_LIBRARIES}) +target_link_libraries(rpc_pub + PUBLIC + epee + net + cryptonote_basic + serialization + ${Boost_THREAD_LIBRARY}) + target_link_libraries(daemon_messages LINK_PRIVATE cryptonote_core @@ -142,6 +161,7 @@ target_link_libraries(daemon_messages target_link_libraries(daemon_rpc_server LINK_PRIVATE rpc + rpc_pub cryptonote_core cryptonote_protocol version diff --git a/src/rpc/fwd.h b/src/rpc/fwd.h new file mode 100644 index 000000000..72537f5a5 --- /dev/null +++ b/src/rpc/fwd.h @@ -0,0 +1,37 @@ +// Copyright (c) 2019-2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +namespace cryptonote +{ + namespace listener + { + class zmq_pub; + } +} diff --git a/src/rpc/zmq_pub.cpp b/src/rpc/zmq_pub.cpp new file mode 100644 index 000000000..0dffffac6 --- /dev/null +++ b/src/rpc/zmq_pub.cpp @@ -0,0 +1,478 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "zmq_pub.h" + +#include <algorithm> +#include <boost/range/adaptor/filtered.hpp> +#include <boost/range/adaptor/transformed.hpp> +#include <boost/thread/locks.hpp> +#include <cassert> +#include <cstdint> +#include <cstring> +#include <rapidjson/document.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/writer.h> +#include <stdexcept> +#include <string> +#include <utility> + +#include "common/expect.h" +#include "crypto/crypto.h" +#include "cryptonote_basic/cryptonote_format_utils.h" +#include "cryptonote_basic/events.h" +#include "misc_log_ex.h" +#include "serialization/json_object.h" + +#undef MONERO_DEFAULT_LOG_CATEGORY +#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq" + +namespace +{ + constexpr const char txpool_signal[] = "tx_signal"; + + using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span<const cryptonote::block>); + using txpool_writer = void(epee::byte_stream&, epee::span<const cryptonote::txpool_event>); + + template<typename F> + struct context + { + char const* const name; + F* generate_pub; + }; + + template<typename T> + bool operator<(const context<T>& lhs, const context<T>& rhs) noexcept + { + return std::strcmp(lhs.name, rhs.name) < 0; + } + + template<typename T> + bool operator<(const context<T>& lhs, const boost::string_ref rhs) noexcept + { + return lhs.name < rhs; + } + + struct is_valid + { + bool operator()(const cryptonote::txpool_event& event) const noexcept + { + return event.res; + } + }; + + template<typename T, std::size_t N> + void verify_sorted(const std::array<context<T>, N>& elems, const char* name) + { + auto unsorted = std::is_sorted_until(elems.begin(), elems.end()); + if (unsorted != elems.end()) + throw std::logic_error{name + std::string{" array is not properly sorted, see: "} + unsorted->name}; + } + + void write_header(epee::byte_stream& buf, const boost::string_ref name) + { + buf.write(name.data(), name.size()); + buf.put(':'); + } + + //! \return `name:...` where `...` is JSON and `name` is directly copied (no quotes - not JSON). + template<typename T> + void json_pub(epee::byte_stream& buf, const T value) + { + rapidjson::Writer<epee::byte_stream> dest{buf}; + using cryptonote::json::toJsonValue; + toJsonValue(dest, value); + } + + //! Object for "minimal" block serialization + struct minimal_chain + { + const std::uint64_t height; + const epee::span<const cryptonote::block> blocks; + }; + + //! Object for "minimal" tx serialization + struct minimal_txpool + { + const cryptonote::transaction& tx; + }; + + void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_chain self) + { + namespace adapt = boost::adaptors; + + const auto to_block_id = [](const cryptonote::block& bl) + { + crypto::hash id; + if (!get_block_hash(bl, id)) + MERROR("ZMQ/Pub failure: get_block_hash"); + return id; + }; + + assert(!self.blocks.empty()); // checked in zmq_pub::send_chain_main + + dest.StartObject(); + INSERT_INTO_JSON_OBJECT(dest, first_height, self.height); + INSERT_INTO_JSON_OBJECT(dest, first_prev_id, self.blocks[0].prev_id); + INSERT_INTO_JSON_OBJECT(dest, ids, (self.blocks | adapt::transformed(to_block_id))); + dest.EndObject(); + } + + void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_txpool self) + { + crypto::hash id{}; + std::size_t blob_size = 0; + if (!get_transaction_hash(self.tx, id, blob_size)) + { + MERROR("ZMQ/Pub failure: get_transaction_hash"); + return; + } + + dest.StartObject(); + INSERT_INTO_JSON_OBJECT(dest, id, id); + INSERT_INTO_JSON_OBJECT(dest, blob_size, blob_size); + dest.EndObject(); + } + + void json_full_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks) + { + json_pub(buf, blocks); + } + + void json_minimal_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks) + { + json_pub(buf, minimal_chain{height, blocks}); + } + + // boost::adaptors are in place "views" - no copy/move takes place + // moving transactions (via sort, etc.), is expensive! + + void json_full_txpool(epee::byte_stream& buf, epee::span<const cryptonote::txpool_event> txes) + { + namespace adapt = boost::adaptors; + const auto to_full_tx = [](const cryptonote::txpool_event& event) + { + return event.tx; + }; + json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_full_tx))); + } + + void json_minimal_txpool(epee::byte_stream& buf, epee::span<const cryptonote::txpool_event> txes) + { + namespace adapt = boost::adaptors; + const auto to_minimal_tx = [](const cryptonote::txpool_event& event) + { + return minimal_txpool{event.tx}; + }; + json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_minimal_tx))); + } + + constexpr const std::array<context<chain_writer>, 2> chain_contexts = + {{ + {u8"json-full-chain_main", json_full_chain}, + {u8"json-minimal-chain_main", json_minimal_chain} + }}; + + constexpr const std::array<context<txpool_writer>, 2> txpool_contexts = + {{ + {u8"json-full-txpool_add", json_full_txpool}, + {u8"json-minimal-txpool_add", json_minimal_txpool} + }}; + + template<typename T, std::size_t N> + epee::span<const context<T>> get_range(const std::array<context<T>, N>& contexts, const boost::string_ref value) + { + const auto not_prefix = [](const boost::string_ref lhs, const context<T>& rhs) + { + return !(boost::string_ref{rhs.name}.starts_with(lhs)); + }; + + const auto lower = std::lower_bound(contexts.begin(), contexts.end(), value); + const auto upper = std::upper_bound(lower, contexts.end(), value, not_prefix); + return {lower, std::size_t(upper - lower)}; + } + + template<std::size_t N, typename T> + void add_subscriptions(std::array<std::size_t, N>& subs, const epee::span<const context<T>> range, context<T> const* const first) + { + assert(range.size() <= N); + assert(range.begin() - first <= N - range.size()); + + for (const auto& ctx : range) + { + const std::size_t i = std::addressof(ctx) - first; + subs[i] = std::min(std::numeric_limits<std::size_t>::max() - 1, subs[i]) + 1; + } + } + + template<std::size_t N, typename T> + void remove_subscriptions(std::array<std::size_t, N>& subs, const epee::span<const context<T>> range, context<T> const* const first) + { + assert(range.size() <= N); + assert(range.begin() - first <= N - range.size()); + + for (const auto& ctx : range) + { + const std::size_t i = std::addressof(ctx) - first; + subs[i] = std::max(std::size_t(1), subs[i]) - 1; + } + } + + template<std::size_t N, typename T, typename... U> + std::array<epee::byte_slice, N> make_pubs(const std::array<std::size_t, N>& subs, const std::array<context<T>, N>& contexts, U&&... args) + { + epee::byte_stream buf{}; + + std::size_t last_offset = 0; + std::array<std::size_t, N> offsets{{}}; + for (std::size_t i = 0; i < N; ++i) + { + if (subs[i]) + { + write_header(buf, contexts[i].name); + contexts[i].generate_pub(buf, std::forward<U>(args)...); + offsets[i] = buf.size() - last_offset; + last_offset = buf.size(); + } + } + + epee::byte_slice bytes{std::move(buf)}; + std::array<epee::byte_slice, N> out; + for (std::size_t i = 0; i < N; ++i) + out[i] = bytes.take_slice(offsets[i]); + + return out; + } + + template<std::size_t N> + std::size_t send_messages(void* const socket, std::array<epee::byte_slice, N>& messages) + { + std::size_t count = 0; + for (epee::byte_slice& message : messages) + { + if (!message.empty()) + { + const expect<void> sent = net::zmq::send(std::move(message), socket, ZMQ_DONTWAIT); + if (!sent) + MERROR("Failed to send ZMQ/Pub message: " << sent.error().message()); + else + ++count; + } + } + return count; + } + + expect<bool> relay_block_pub(void* const relay, void* const pub) noexcept + { + zmq_msg_t msg; + zmq_msg_init(std::addressof(msg)); + MONERO_CHECK(net::zmq::retry_op(zmq_msg_recv, std::addressof(msg), relay, ZMQ_DONTWAIT)); + + const boost::string_ref payload{ + reinterpret_cast<const char*>(zmq_msg_data(std::addressof(msg))), + zmq_msg_size(std::addressof(msg)) + }; + + if (payload == txpool_signal) + { + zmq_msg_close(std::addressof(msg)); + return false; + } + + // forward block messages (serialized on P2P thread for now) + const expect<void> sent = net::zmq::retry_op(zmq_msg_send, std::addressof(msg), pub, ZMQ_DONTWAIT); + if (!sent) + { + zmq_msg_close(std::addressof(msg)); + return sent.error(); + } + return true; + } +} // anonymous + +namespace cryptonote { namespace listener +{ + +zmq_pub::zmq_pub(void* context) + : relay_(), + chain_subs_{{0}}, + txpool_subs_{{0}}, + sync_() +{ + if (!context) + throw std::logic_error{"ZMQ context cannot be NULL"}; + + verify_sorted(chain_contexts, "chain_contexts"); + verify_sorted(txpool_contexts, "txpool_contexts"); + + relay_.reset(zmq_socket(context, ZMQ_PAIR)); + if (!relay_) + MONERO_ZMQ_THROW("Failed to create relay socket"); + if (zmq_connect(relay_.get(), relay_endpoint()) != 0) + MONERO_ZMQ_THROW("Failed to connect relay socket"); +} + +zmq_pub::~zmq_pub() +{} + +bool zmq_pub::sub_request(boost::string_ref message) +{ + if (!message.empty()) + { + const char tag = message[0]; + message.remove_prefix(1); + + const auto chain_range = get_range(chain_contexts, message); + const auto txpool_range = get_range(txpool_contexts, message); + + if (!chain_range.empty() || !txpool_range.empty()) + { + MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " << + chain_range.size() << " chain topic(s) and " << txpool_range.size() << " txpool topic(s)"); + + const boost::lock_guard<boost::mutex> lock{sync_}; + switch (tag) + { + case 0: + remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin()); + remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin()); + return true; + case 1: + add_subscriptions(chain_subs_, chain_range, chain_contexts.begin()); + add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin()); + return true; + default: + break; + } + } + } + MERROR("Invalid ZMQ/Sub message"); + return false; +} + +bool zmq_pub::relay_to_pub(void* const relay, void* const pub) +{ + const expect<bool> relayed = relay_block_pub(relay, pub); + if (!relayed) + { + MERROR("Error relaying ZMQ/Pub: " << relayed.error().message()); + return false; + } + + if (!*relayed) + { + std::array<std::size_t, 2> subs; + std::vector<cryptonote::txpool_event> events; + { + const boost::lock_guard<boost::mutex> lock{sync_}; + if (txes_.empty()) + return false; + + subs = txpool_subs_; + events = std::move(txes_.front()); + txes_.pop_front(); + } + auto messages = make_pubs(subs, txpool_contexts, epee::to_span(events)); + send_messages(pub, messages); + MDEBUG("Sent txpool ZMQ/Pub"); + } + else + MDEBUG("Sent chain_main ZMQ/Pub"); + + return true; +} + +std::size_t zmq_pub::send_chain_main(const std::uint64_t height, const epee::span<const cryptonote::block> blocks) +{ + if (blocks.empty()) + return 0; + + /* Block format only sends one block at a time - multiple block notifications + are less common and only occur on rollbacks. */ + + boost::unique_lock<boost::mutex> guard{sync_}; + + const auto subs_copy = chain_subs_; + guard.unlock(); + + for (const std::size_t sub : subs_copy) + { + if (sub) + { + /* cryptonote_core/blockchain.cpp cannot "give" us the block like core + does for txpool events. Since copying the block is expensive anyway, + serialization is done right here on the p2p thread (for now). */ + + auto messages = make_pubs(subs_copy, chain_contexts, height, blocks); + guard.lock(); + return send_messages(relay_.get(), messages); + } + } + return 0; +} + +std::size_t zmq_pub::send_txpool_add(std::vector<txpool_event> txes) +{ + if (txes.empty()) + return 0; + + const boost::lock_guard<boost::mutex> lock{sync_}; + for (const std::size_t sub : txpool_subs_) + { + if (sub) + { + const expect<void> sent = net::zmq::retry_op(zmq_send_const, relay_.get(), txpool_signal, sizeof(txpool_signal) - 1, ZMQ_DONTWAIT); + if (sent) + txes_.emplace_back(std::move(txes)); + else + MERROR("ZMQ/Pub failure, relay queue error: " << sent.error().message()); + return bool(sent); + } + } + return 0; +} + +void zmq_pub::chain_main::operator()(const std::uint64_t height, epee::span<const cryptonote::block> blocks) const +{ + const std::shared_ptr<zmq_pub> self = self_.lock(); + if (self) + self->send_chain_main(height, blocks); + else + MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed"); +} + +void zmq_pub::txpool_add::operator()(std::vector<cryptonote::txpool_event> txes) const +{ + const std::shared_ptr<zmq_pub> self = self_.lock(); + if (self) + self->send_txpool_add(std::move(txes)); + else + MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed"); +} + +}} diff --git a/src/rpc/zmq_pub.h b/src/rpc/zmq_pub.h new file mode 100644 index 000000000..02e6b8103 --- /dev/null +++ b/src/rpc/zmq_pub.h @@ -0,0 +1,110 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. + // +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include <array> +#include <boost/thread/mutex.hpp> +#include <boost/utility/string_ref.hpp> +#include <cstdint> +#include <deque> +#include <memory> +#include <vector> + +#include "cryptonote_basic/fwd.h" +#include "net/zmq.h" +#include "span.h" + +namespace cryptonote { namespace listener +{ +/*! \brief Sends ZMQ PUB messages on cryptonote events + + Clients must ensure that all transaction(s) are notified before any blocks + they are contained in, and must ensure that each block is notified in chain + order. An external lock **must** be held by clients during the entire + txpool check and notification sequence and (a possibly second) lock is held + during the entire block check and notification sequence. Otherwise, events + could be sent in a different order than processed. */ +class zmq_pub +{ + /* Each socket has its own internal queue. So we can only use one socket, else + the messages being published are not guaranteed to be in the same order + pushed. */ + + net::zmq::socket relay_; + std::deque<std::vector<txpool_event>> txes_; + std::array<std::size_t, 2> chain_subs_; + std::array<std::size_t, 2> txpool_subs_; + boost::mutex sync_; //!< Synchronizes counts in `*_subs_` arrays. + + public: + //! \return Name of ZMQ_PAIR endpoint for pub notifications + static constexpr const char* relay_endpoint() noexcept { return "inproc://pub_relay"; } + + explicit zmq_pub(void* context); + + zmq_pub(const zmq_pub&) = delete; + zmq_pub(zmq_pub&&) = delete; + + ~zmq_pub(); + + zmq_pub& operator=(const zmq_pub&) = delete; + zmq_pub& operator=(zmq_pub&&) = delete; + + //! Process a client subscription request (from XPUB sockets). Thread-safe. + bool sub_request(const boost::string_ref message); + + /*! Forward ZMQ messages sent to `relay` via `send_chain_main` or + `send_txpool_add` to `pub`. Used by `ZmqServer`. */ + bool relay_to_pub(void* relay, void* pub); + + /*! Send a `ZMQ_PUB` notification for a change to the main chain. + Thread-safe. + \return Number of ZMQ messages sent to relay. */ + std::size_t send_chain_main(std::uint64_t height, epee::span<const cryptonote::block> blocks); + + /*! Send a `ZMQ_PUB` notification for new tx(es) being added to the local + pool. Thread-safe. + \return Number of ZMQ messages sent to relay. */ + std::size_t send_txpool_add(std::vector<cryptonote::txpool_event> txes); + + //! Callable for `send_chain_main` with weak ownership to `zmq_pub` object. + struct chain_main + { + std::weak_ptr<zmq_pub> self_; + void operator()(std::uint64_t height, epee::span<const cryptonote::block> blocks) const; + }; + + //! Callable for `send_txpool_add` with weak ownership to `zmq_pub` object. + struct txpool_add + { + std::weak_ptr<zmq_pub> self_; + void operator()(std::vector<cryptonote::txpool_event> txes) const; + }; + }; +}} diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp index 1a9f49c01..6105b7f3a 100644 --- a/src/rpc/zmq_server.cpp +++ b/src/rpc/zmq_server.cpp @@ -29,10 +29,16 @@ #include "zmq_server.h" #include <chrono> -#include <cstdint> +#include <cstring> +#include <utility> +#include <stdexcept> #include <system_error> #include "byte_slice.h" +#include "rpc/zmq_pub.h" + +#undef MONERO_DEFAULT_LOG_CATEGORY +#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq" namespace cryptonote { @@ -42,14 +48,57 @@ namespace constexpr const int num_zmq_threads = 1; constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages -} + + net::zmq::socket init_socket(void* context, int type, epee::span<const std::string> addresses) + { + if (context == nullptr) + throw std::logic_error{"NULL context provided"}; + + net::zmq::socket out{}; + out.reset(zmq_socket(context, type)); + if (!out) + { + MONERO_LOG_ZMQ_ERROR("Failed to create ZMQ socket"); + return nullptr; + } + + if (zmq_setsockopt(out.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0) + { + MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size"); + return nullptr; + } + + static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count(); + if (zmq_setsockopt(out.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0) + { + MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout"); + return nullptr; + } + + for (const std::string& address : addresses) + { + if (zmq_bind(out.get(), address.c_str()) < 0) + { + MONERO_LOG_ZMQ_ERROR("ZMQ bind failed"); + return nullptr; + } + MINFO("ZMQ now listening at " << address); + } + + return out; + } +} // anonymous namespace rpc { ZmqServer::ZmqServer(RpcHandler& h) : handler(h), - context(zmq_init(num_zmq_threads)) + context(zmq_init(num_zmq_threads)), + rep_socket(nullptr), + pub_socket(nullptr), + relay_socket(nullptr), + shared_state(nullptr) { if (!context) MONERO_ZMQ_THROW("Unable to create ZMQ context"); @@ -64,22 +113,59 @@ void ZmqServer::serve() try { // socket must close before `zmq_term` will exit. - const net::zmq::socket socket = std::move(rep_socket); - if (!socket) + const net::zmq::socket rep = std::move(rep_socket); + const net::zmq::socket pub = std::move(pub_socket); + const net::zmq::socket relay = std::move(relay_socket); + const std::shared_ptr<listener::zmq_pub> state = std::move(shared_state); + + const unsigned init_count = unsigned(bool(pub)) + bool(relay) + bool(state); + if (!rep || (init_count && init_count != 3)) { - MERROR("ZMQ RPC server reply socket is null"); + MERROR("ZMQ RPC server socket is null"); return; } + MINFO("ZMQ Server started"); + + const int read_flags = pub ? ZMQ_DONTWAIT : 0; + std::array<zmq_pollitem_t, 3> sockets = + {{ + {relay.get(), 0, ZMQ_POLLIN, 0}, + {pub.get(), 0, ZMQ_POLLIN, 0}, + {rep.get(), 0, ZMQ_POLLIN, 0} + }}; + + /* This uses XPUB to watch for subscribers, to reduce CPU cycles for + serialization when the data will be dropped. This is important for block + serialization, which is done on the p2p threads currently (see + zmq_pub.cpp). + + XPUB sockets are not thread-safe, so the p2p thread cannot write into + the socket while we read here for subscribers. A ZMQ_PAIR socket is + used for inproc notification. No data is every copied to kernel, it is + all userspace messaging. */ + while (1) { - const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get())); - MDEBUG("Received RPC request: \"" << message << "\""); - epee::byte_slice response = handler.handle(message); + if (pub) + MONERO_UNWRAP(net::zmq::retry_op(zmq_poll, sockets.data(), sockets.size(), -1)); - const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()}; - MDEBUG("Sending RPC reply: \"" << response_view << "\""); - MONERO_UNWRAP(net::zmq::send(std::move(response), socket.get())); + if (sockets[0].revents) + state->relay_to_pub(relay.get(), pub.get()); + + if (sockets[1].revents) + state->sub_request(MONERO_UNWRAP(net::zmq::receive(pub.get(), ZMQ_DONTWAIT))); + + if (!pub || sockets[2].revents) + { + const std::string message = MONERO_UNWRAP(net::zmq::receive(rep.get(), read_flags)); + MDEBUG("Received RPC request: \"" << message << "\""); + epee::byte_slice response = handler.handle(message); + + const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()}; + MDEBUG("Sending RPC reply: \"" << response_view << "\""); + MONERO_UNWRAP(net::zmq::send(std::move(response), rep.get())); + } } } catch (const std::system_error& e) @@ -97,38 +183,12 @@ void ZmqServer::serve() } } -bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port) -{ - MERROR("ZmqServer::addIPCSocket not yet implemented!"); - return false; -} - -bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port) +void* ZmqServer::init_rpc(boost::string_ref address, boost::string_ref port) { if (!context) { MERROR("ZMQ RPC Server already shutdown"); - return false; - } - - rep_socket.reset(zmq_socket(context.get(), ZMQ_REP)); - if (!rep_socket) - { - MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed"); - return false; - } - - if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0) - { - MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size"); - return false; - } - - static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count(); - if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0) - { - MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout"); - return false; + return nullptr; } if (address.empty()) @@ -141,12 +201,34 @@ bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port) bind_address += ":"; bind_address.append(port.data(), port.size()); - if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0) + rep_socket = init_socket(context.get(), ZMQ_REP, {std::addressof(bind_address), 1}); + return bool(rep_socket) ? context.get() : nullptr; +} + +std::shared_ptr<listener::zmq_pub> ZmqServer::init_pub(epee::span<const std::string> addresses) +{ + try + { + shared_state = std::make_shared<listener::zmq_pub>(context.get()); + pub_socket = init_socket(context.get(), ZMQ_XPUB, addresses); + if (!pub_socket) + throw std::runtime_error{"Unable to initialize ZMQ_XPUB socket"}; + + const std::string relay_address[] = {listener::zmq_pub::relay_endpoint()}; + relay_socket = init_socket(context.get(), ZMQ_PAIR, relay_address); + if (!relay_socket) + throw std::runtime_error{"Unable to initialize ZMQ_PAIR relay"}; + } + catch (const std::runtime_error& e) { - MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed"); - return false; + shared_state = nullptr; + pub_socket = nullptr; + relay_socket = nullptr; + MERROR("Failed to create ZMQ/Pub listener: " << e.what()); + return nullptr; } - return true; + + return shared_state; } void ZmqServer::run() @@ -163,7 +245,6 @@ void ZmqServer::stop() run_thread.join(); } - } // namespace cryptonote } // namespace rpc diff --git a/src/rpc/zmq_server.h b/src/rpc/zmq_server.h index 1143db839..ddf44b411 100644 --- a/src/rpc/zmq_server.h +++ b/src/rpc/zmq_server.h @@ -30,10 +30,16 @@ #include <boost/thread/thread.hpp> #include <boost/utility/string_ref.hpp> +#include <cstdint> +#include <memory> +#include <string> #include "common/command_line.h" +#include "cryptonote_basic/fwd.h" #include "net/zmq.h" -#include "rpc_handler.h" +#include "rpc/fwd.h" +#include "rpc/rpc_handler.h" +#include "span.h" namespace cryptonote { @@ -41,7 +47,7 @@ namespace cryptonote namespace rpc { -class ZmqServer +class ZmqServer final { public: @@ -49,12 +55,13 @@ class ZmqServer ~ZmqServer(); - static void init_options(boost::program_options::options_description& desc); - void serve(); - bool addIPCSocket(boost::string_ref address, boost::string_ref port); - bool addTCPSocket(boost::string_ref address, boost::string_ref port); + //! \return ZMQ context on success, `nullptr` on failure + void* init_rpc(boost::string_ref address, boost::string_ref port); + + //! \return `nullptr` on errors. + std::shared_ptr<listener::zmq_pub> init_pub(epee::span<const std::string> addresses); void run(); void stop(); @@ -67,9 +74,11 @@ class ZmqServer boost::thread run_thread; net::zmq::socket rep_socket; + net::zmq::socket pub_socket; + net::zmq::socket relay_socket; + std::shared_ptr<listener::zmq_pub> shared_state; }; - } // namespace cryptonote } // namespace rpc diff --git a/src/serialization/json_object.h b/src/serialization/json_object.h index 2a9b63b08..e016bef41 100644 --- a/src/serialization/json_object.h +++ b/src/serialization/json_object.h @@ -356,7 +356,7 @@ inline typename std::enable_if<sfinae::is_vector_like<Vec>::value, void>::type t dest.StartArray(); for (const auto& t : vec) toJsonValue(dest, t); - dest.EndArray(vec.size()); + dest.EndArray(); } template <typename Vec> diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt index 7e6432766..a5984b2c9 100644 --- a/tests/unit_tests/CMakeLists.txt +++ b/tests/unit_tests/CMakeLists.txt @@ -110,6 +110,7 @@ target_link_libraries(unit_tests cryptonote_protocol cryptonote_core daemon_messages + daemon_rpc_server blockchain_db lmdb_lib rpc diff --git a/tests/unit_tests/json_serialization.cpp b/tests/unit_tests/json_serialization.cpp index 5873d0ab6..1db923f7b 100644 --- a/tests/unit_tests/json_serialization.cpp +++ b/tests/unit_tests/json_serialization.cpp @@ -15,7 +15,7 @@ #include "serialization/json_object.h" -namespace +namespace test { cryptonote::transaction make_miner_transaction(cryptonote::account_public_address const& to) @@ -82,7 +82,10 @@ namespace return tx; } +} +namespace +{ template<typename T> T test_json(const T& value) { @@ -109,7 +112,7 @@ TEST(JsonSerialization, MinerTransaction) { cryptonote::account_base acct; acct.generate(); - const auto miner_tx = make_miner_transaction(acct.get_keys().m_account_address); + const auto miner_tx = test::make_miner_transaction(acct.get_keys().m_account_address); crypto::hash tx_hash{}; ASSERT_TRUE(cryptonote::get_transaction_hash(miner_tx, tx_hash)); @@ -137,8 +140,8 @@ TEST(JsonSerialization, RegularTransaction) cryptonote::account_base acct2; acct2.generate(); - const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address); - const auto tx = make_transaction( + const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address); + const auto tx = test::make_transaction( acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, false, false ); @@ -168,8 +171,8 @@ TEST(JsonSerialization, RingctTransaction) cryptonote::account_base acct2; acct2.generate(); - const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address); - const auto tx = make_transaction( + const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address); + const auto tx = test::make_transaction( acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, true, false ); @@ -199,8 +202,8 @@ TEST(JsonSerialization, BulletproofTransaction) cryptonote::account_base acct2; acct2.generate(); - const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address); - const auto tx = make_transaction( + const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address); + const auto tx = test::make_transaction( acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, true, true ); diff --git a/tests/unit_tests/json_serialization.h b/tests/unit_tests/json_serialization.h new file mode 100644 index 000000000..2d8267261 --- /dev/null +++ b/tests/unit_tests/json_serialization.h @@ -0,0 +1,42 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +namespace test +{ + cryptonote::transaction make_miner_transaction(cryptonote::account_public_address const& to); + + cryptonote::transaction + make_transaction( + cryptonote::account_keys const& from, + std::vector<cryptonote::transaction> const& sources, + std::vector<cryptonote::account_public_address> const& destinations, + bool rct, + bool bulletproof); +} diff --git a/tests/unit_tests/levin.cpp b/tests/unit_tests/levin.cpp index d9d273837..15563e764 100644 --- a/tests/unit_tests/levin.cpp +++ b/tests/unit_tests/levin.cpp @@ -680,6 +680,76 @@ TEST_F(levin_notify, local_without_padding) } } +TEST_F(levin_notify, forward_without_padding) +{ + cryptonote::levin::notify notifier = make_notifier(0, true, false); + + for (unsigned count = 0; count < 10; ++count) + add_connection(count % 2 == 0); + + { + const auto status = notifier.get_status(); + EXPECT_FALSE(status.has_noise); + EXPECT_FALSE(status.connections_filled); + } + notifier.new_out_connection(); + io_service_.poll(); + + std::vector<cryptonote::blobdata> txs(2); + txs[0].resize(100, 'f'); + txs[1].resize(200, 'e'); + + std::vector<cryptonote::blobdata> sorted_txs = txs; + std::sort(sorted_txs.begin(), sorted_txs.end()); + + ASSERT_EQ(10u, contexts_.size()); + bool has_stemmed = false; + bool has_fluffed = false; + while (!has_stemmed || !has_fluffed) + { + auto context = contexts_.begin(); + EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), events_, cryptonote::relay_method::forward)); + + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + const bool is_stem = events_.has_stem_txes(); + EXPECT_EQ(txs, events_.take_relayed(is_stem ? cryptonote::relay_method::stem : cryptonote::relay_method::fluff)); + + if (!is_stem) + { + notifier.run_fluff(); + ASSERT_LT(0u, io_service_.poll()); + } + + std::size_t send_count = 0; + EXPECT_EQ(0u, context->process_send_queue()); + for (++context; context != contexts_.end(); ++context) + { + const std::size_t sent = context->process_send_queue(); + if (sent && is_stem) + EXPECT_EQ(1u, (context - contexts_.begin()) % 2); + send_count += sent; + } + + EXPECT_EQ(is_stem ? 1u : 9u, send_count); + ASSERT_EQ(is_stem ? 1u : 9u, receiver_.notified_size()); + for (unsigned count = 0; count < (is_stem ? 1u : 9u); ++count) + { + auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; + if (is_stem) + EXPECT_EQ(txs, notification.txs); + else + EXPECT_EQ(sorted_txs, notification.txs); + EXPECT_TRUE(notification._.empty()); + EXPECT_EQ(!is_stem, notification.dandelionpp_fluff); + } + + has_stemmed |= is_stem; + has_fluffed |= !is_stem; + notifier.run_epoch(); + } +} + TEST_F(levin_notify, block_without_padding) { cryptonote::levin::notify notifier = make_notifier(0, true, false); @@ -918,6 +988,73 @@ TEST_F(levin_notify, local_with_padding) } } +TEST_F(levin_notify, forward_with_padding) +{ + cryptonote::levin::notify notifier = make_notifier(0, true, true); + + for (unsigned count = 0; count < 10; ++count) + add_connection(count % 2 == 0); + + { + const auto status = notifier.get_status(); + EXPECT_FALSE(status.has_noise); + EXPECT_FALSE(status.connections_filled); + } + notifier.new_out_connection(); + io_service_.poll(); + + std::vector<cryptonote::blobdata> txs(2); + txs[0].resize(100, 'e'); + txs[1].resize(200, 'f'); + + ASSERT_EQ(10u, contexts_.size()); + bool has_stemmed = false; + bool has_fluffed = false; + while (!has_stemmed || !has_fluffed) + { + auto context = contexts_.begin(); + EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), events_, cryptonote::relay_method::forward)); + + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + const bool is_stem = events_.has_stem_txes(); + EXPECT_EQ(txs, events_.take_relayed(is_stem ? cryptonote::relay_method::stem : cryptonote::relay_method::fluff)); + + if (!is_stem) + { + notifier.run_fluff(); + ASSERT_LT(0u, io_service_.poll()); + } + + std::size_t send_count = 0; + EXPECT_EQ(0u, context->process_send_queue()); + for (++context; context != contexts_.end(); ++context) + { + const std::size_t sent = context->process_send_queue(); + if (sent && is_stem) + { + EXPECT_EQ(1u, (context - contexts_.begin()) % 2); + EXPECT_FALSE(context->is_incoming()); + } + send_count += sent; + } + + EXPECT_EQ(is_stem ? 1u : 9u, send_count); + ASSERT_EQ(is_stem ? 1u : 9u, receiver_.notified_size()); + for (unsigned count = 0; count < (is_stem ? 1u : 9u); ++count) + { + auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; + EXPECT_EQ(txs, notification.txs); + EXPECT_FALSE(notification._.empty()); + EXPECT_EQ(!is_stem, notification.dandelionpp_fluff); + } + + has_stemmed |= is_stem; + has_fluffed |= !is_stem; + notifier.run_epoch(); + } +} + TEST_F(levin_notify, block_with_padding) { cryptonote::levin::notify notifier = make_notifier(0, true, true); @@ -1021,7 +1158,7 @@ TEST_F(levin_notify, private_fluff_without_padding) auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; EXPECT_EQ(txs, notification.txs); EXPECT_TRUE(notification._.empty()); - EXPECT_FALSE(notification.dandelionpp_fluff); + EXPECT_TRUE(notification.dandelionpp_fluff); } } } @@ -1057,7 +1194,7 @@ TEST_F(levin_notify, private_stem_without_padding) io_service_.reset(); ASSERT_LT(0u, io_service_.poll()); - EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::fluff)); + EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::stem)); EXPECT_EQ(0u, context->process_send_queue()); for (++context; context != contexts_.end(); ++context) @@ -1072,7 +1209,7 @@ TEST_F(levin_notify, private_stem_without_padding) auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; EXPECT_EQ(txs, notification.txs); EXPECT_TRUE(notification._.empty()); - EXPECT_FALSE(notification.dandelionpp_fluff); + EXPECT_TRUE(notification.dandelionpp_fluff); } } } @@ -1123,7 +1260,58 @@ TEST_F(levin_notify, private_local_without_padding) auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; EXPECT_EQ(txs, notification.txs); EXPECT_TRUE(notification._.empty()); - EXPECT_FALSE(notification.dandelionpp_fluff); + EXPECT_TRUE(notification.dandelionpp_fluff); + } + } +} + +TEST_F(levin_notify, private_forward_without_padding) +{ + // private mode always uses fluff but marked as stem + cryptonote::levin::notify notifier = make_notifier(0, false, false); + + for (unsigned count = 0; count < 10; ++count) + add_connection(count % 2 == 0); + + { + const auto status = notifier.get_status(); + EXPECT_FALSE(status.has_noise); + EXPECT_FALSE(status.connections_filled); + } + notifier.new_out_connection(); + io_service_.poll(); + + std::vector<cryptonote::blobdata> txs(2); + txs[0].resize(100, 'e'); + txs[1].resize(200, 'f'); + + ASSERT_EQ(10u, contexts_.size()); + { + auto context = contexts_.begin(); + EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), events_, cryptonote::relay_method::forward)); + + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + notifier.run_fluff(); + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + + EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::forward)); + + EXPECT_EQ(0u, context->process_send_queue()); + for (++context; context != contexts_.end(); ++context) + { + const bool is_incoming = ((context - contexts_.begin()) % 2 == 0); + EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue()); + } + + ASSERT_EQ(5u, receiver_.notified_size()); + for (unsigned count = 0; count < 5; ++count) + { + auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; + EXPECT_EQ(txs, notification.txs); + EXPECT_TRUE(notification._.empty()); + EXPECT_TRUE(notification.dandelionpp_fluff); } } } @@ -1233,7 +1421,7 @@ TEST_F(levin_notify, private_fluff_with_padding) auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; EXPECT_EQ(txs, notification.txs); EXPECT_FALSE(notification._.empty()); - EXPECT_FALSE(notification.dandelionpp_fluff); + EXPECT_TRUE(notification.dandelionpp_fluff); } } } @@ -1268,7 +1456,7 @@ TEST_F(levin_notify, private_stem_with_padding) io_service_.reset(); ASSERT_LT(0u, io_service_.poll()); - EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::fluff)); + EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::stem)); EXPECT_EQ(0u, context->process_send_queue()); for (++context; context != contexts_.end(); ++context) @@ -1283,7 +1471,7 @@ TEST_F(levin_notify, private_stem_with_padding) auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; EXPECT_EQ(txs, notification.txs); EXPECT_FALSE(notification._.empty()); - EXPECT_FALSE(notification.dandelionpp_fluff); + EXPECT_TRUE(notification.dandelionpp_fluff); } } } @@ -1333,7 +1521,57 @@ TEST_F(levin_notify, private_local_with_padding) auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; EXPECT_EQ(txs, notification.txs); EXPECT_FALSE(notification._.empty()); - EXPECT_FALSE(notification.dandelionpp_fluff); + EXPECT_TRUE(notification.dandelionpp_fluff); + } + } +} + +TEST_F(levin_notify, private_forward_with_padding) +{ + cryptonote::levin::notify notifier = make_notifier(0, false, true); + + for (unsigned count = 0; count < 10; ++count) + add_connection(count % 2 == 0); + + { + const auto status = notifier.get_status(); + EXPECT_FALSE(status.has_noise); + EXPECT_FALSE(status.connections_filled); + } + notifier.new_out_connection(); + io_service_.poll(); + + std::vector<cryptonote::blobdata> txs(2); + txs[0].resize(100, 'e'); + txs[1].resize(200, 'f'); + + ASSERT_EQ(10u, contexts_.size()); + { + auto context = contexts_.begin(); + EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), events_, cryptonote::relay_method::forward)); + + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + notifier.run_fluff(); + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + + EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::forward)); + + EXPECT_EQ(0u, context->process_send_queue()); + for (++context; context != contexts_.end(); ++context) + { + const bool is_incoming = ((context - contexts_.begin()) % 2 == 0); + EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue()); + } + + ASSERT_EQ(5u, receiver_.notified_size()); + for (unsigned count = 0; count < 5; ++count) + { + auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; + EXPECT_EQ(txs, notification.txs); + EXPECT_FALSE(notification._.empty()); + EXPECT_TRUE(notification.dandelionpp_fluff); } } } diff --git a/tests/unit_tests/zmq_rpc.cpp b/tests/unit_tests/zmq_rpc.cpp index af1f1608b..1d065fc45 100644 --- a/tests/unit_tests/zmq_rpc.cpp +++ b/tests/unit_tests/zmq_rpc.cpp @@ -26,11 +26,25 @@ // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include <boost/preprocessor/stringize.hpp> #include <gtest/gtest.h> +#include <rapidjson/document.h> +#include "cryptonote_basic/account.h" +#include "cryptonote_basic/cryptonote_basic.h" +#include "cryptonote_basic/events.h" +#include "cryptonote_basic/cryptonote_format_utils.h" +#include "json_serialization.h" +#include "net/zmq.h" #include "rpc/message.h" +#include "rpc/zmq_pub.h" +#include "rpc/zmq_server.h" #include "serialization/json_object.h" +#define MASSERT(...) \ + if (!(__VA_ARGS__)) \ + return testing::AssertionFailure() << BOOST_PP_STRINGIZE(__VA_ARGS__) + TEST(ZmqFullMessage, InvalidRequest) { EXPECT_THROW( @@ -53,3 +67,711 @@ TEST(ZmqFullMessage, Request) cryptonote::rpc::FullMessage parsed{request, true}; EXPECT_STREQ("foo", parsed.getRequestType().c_str()); } + +namespace +{ + using published_json = std::pair<std::string, rapidjson::Document>; + + constexpr const char inproc_pub[] = "inproc://dummy_pub"; + + net::zmq::socket create_socket(void* ctx, const char* address) + { + net::zmq::socket sock{zmq_socket(ctx, ZMQ_PAIR)}; + if (!sock) + MONERO_ZMQ_THROW("failed to create socket"); + if (zmq_bind(sock.get(), address) != 0) + MONERO_ZMQ_THROW("socket bind failure"); + return sock; + } + + std::vector<std::string> get_messages(void* socket, int count = -1) + { + std::vector<std::string> out; + for ( ; count || count < 0; --count) + { + expect<std::string> next = net::zmq::receive(socket, (count < 0 ? ZMQ_DONTWAIT : 0)); + if (next == net::zmq::make_error_code(EAGAIN)) + return out; + out.push_back(std::move(*next)); + } + return out; + } + + std::vector<published_json> get_published(void* socket, int count = -1) + { + std::vector<published_json> out; + + const auto messages = get_messages(socket, count); + out.reserve(messages.size()); + + for (const std::string& message : messages) + { + const char* split = std::strchr(message.c_str(), ':'); + if (!split) + throw std::runtime_error{"Invalid ZMQ/Pub message"}; + + out.emplace_back(); + out.back().first = {message.c_str(), split}; + if (out.back().second.Parse(split + 1).HasParseError()) + throw std::runtime_error{"Failed to parse ZMQ/Pub message"}; + } + + return out; + } + + testing::AssertionResult compare_full_txpool(epee::span<const cryptonote::txpool_event> events, const published_json& pub) + { + MASSERT(pub.first == "json-full-txpool_add"); + MASSERT(pub.second.IsArray()); + MASSERT(pub.second.Size() <= events.size()); + + std::size_t i = 0; + for (const cryptonote::txpool_event& event : events) + { + MASSERT(i <= pub.second.Size()); + if (!event.res) + continue; + + cryptonote::transaction tx{}; + cryptonote::json::fromJsonValue(pub.second[i], tx); + + crypto::hash id{}; + MASSERT(cryptonote::get_transaction_hash(event.tx, id)); + MASSERT(cryptonote::get_transaction_hash(tx, id)); + MASSERT(event.tx.hash == tx.hash); + ++i; + } + return testing::AssertionSuccess(); + } + + testing::AssertionResult compare_minimal_txpool(epee::span<const cryptonote::txpool_event> events, const published_json& pub) + { + MASSERT(pub.first == "json-minimal-txpool_add"); + MASSERT(pub.second.IsArray()); + MASSERT(pub.second.Size() <= events.size()); + + std::size_t i = 0; + for (const cryptonote::txpool_event& event : events) + { + MASSERT(i <= pub.second.Size()); + if (!event.res) + continue; + + std::size_t actual_size = 0; + crypto::hash actual_id{}; + + MASSERT(pub.second[i].IsObject()); + GET_FROM_JSON_OBJECT(pub.second[i], actual_id, id); + GET_FROM_JSON_OBJECT(pub.second[i], actual_size, blob_size); + + std::size_t expected_size = 0; + crypto::hash expected_id{}; + MASSERT(cryptonote::get_transaction_hash(event.tx, expected_id, expected_size)); + MASSERT(expected_size == actual_size); + MASSERT(expected_id == actual_id); + ++i; + } + return testing::AssertionSuccess(); + } + + testing::AssertionResult compare_full_block(const epee::span<const cryptonote::block> expected, const published_json& pub) + { + MASSERT(pub.first == "json-full-chain_main"); + MASSERT(pub.second.IsArray()); + + std::vector<cryptonote::block> actual; + cryptonote::json::fromJsonValue(pub.second, actual); + + MASSERT(expected.size() == actual.size()); + + for (std::size_t i = 0; i < expected.size(); ++i) + { + crypto::hash id; + MASSERT(cryptonote::get_block_hash(expected[i], id)); + MASSERT(cryptonote::get_block_hash(actual[i], id)); + MASSERT(expected[i].hash == actual[i].hash); + } + + return testing::AssertionSuccess(); + } + + testing::AssertionResult compare_minimal_block(std::size_t height, const epee::span<const cryptonote::block> expected, const published_json& pub) + { + MASSERT(pub.first == "json-minimal-chain_main"); + MASSERT(pub.second.IsObject()); + MASSERT(!expected.empty()); + + std::size_t actual_height = 0; + crypto::hash actual_id{}; + crypto::hash actual_prev_id{}; + std::vector<crypto::hash> actual_ids{}; + GET_FROM_JSON_OBJECT(pub.second, actual_height, first_height); + GET_FROM_JSON_OBJECT(pub.second, actual_prev_id, first_prev_id); + GET_FROM_JSON_OBJECT(pub.second, actual_ids, ids); + + MASSERT(height == actual_height); + MASSERT(expected[0].prev_id == actual_prev_id); + MASSERT(expected.size() == actual_ids.size()); + + for (std::size_t i = 0; i < expected.size(); ++i) + { + crypto::hash id; + MASSERT(cryptonote::get_block_hash(expected[i], id)); + MASSERT(id == actual_ids[i]); + } + + return testing::AssertionSuccess(); + } + + struct zmq_base : public testing::Test + { + cryptonote::account_base acct; + + zmq_base() + : testing::Test(), acct() + { + acct.generate(); + } + + cryptonote::transaction make_miner_transaction() + { + return test::make_miner_transaction(acct.get_keys().m_account_address); + } + + cryptonote::transaction make_transaction(const std::vector<cryptonote::account_public_address>& destinations) + { + return test::make_transaction(acct.get_keys(), {make_miner_transaction()}, destinations, true, true); + } + + cryptonote::transaction make_transaction() + { + cryptonote::account_base temp_account; + temp_account.generate(); + return make_transaction({temp_account.get_keys().m_account_address}); + } + + cryptonote::block make_block() + { + cryptonote::block block{}; + block.major_version = 1; + block.minor_version = 3; + block.timestamp = 100; + block.prev_id = crypto::rand<crypto::hash>(); + block.nonce = 100; + block.miner_tx = make_miner_transaction(); + return block; + } + }; + + struct zmq_pub : public zmq_base + { + net::zmq::context ctx; + net::zmq::socket relay; + net::zmq::socket dummy_pub; + net::zmq::socket dummy_client; + std::shared_ptr<cryptonote::listener::zmq_pub> pub; + + zmq_pub() + : zmq_base(), + ctx(zmq_init(1)), + relay(create_socket(ctx.get(), cryptonote::listener::zmq_pub::relay_endpoint())), + dummy_pub(create_socket(ctx.get(), inproc_pub)), + dummy_client(zmq_socket(ctx.get(), ZMQ_PAIR)), + pub(std::make_shared<cryptonote::listener::zmq_pub>(ctx.get())) + { + if (!dummy_client) + MONERO_ZMQ_THROW("failed to create socket"); + if (zmq_connect(dummy_client.get(), inproc_pub) != 0) + MONERO_ZMQ_THROW("failed to connect to dummy pub"); + } + + virtual void TearDown() override final + { + EXPECT_EQ(0u, get_messages(relay.get()).size()); + EXPECT_EQ(0u, get_messages(dummy_client.get()).size()); + } + + template<std::size_t N> + bool sub_request(const char (&topic)[N]) + { + return pub->sub_request({topic, N - 1}); + } + }; + + struct dummy_handler final : cryptonote::rpc::RpcHandler + { + dummy_handler() + : cryptonote::rpc::RpcHandler() + {} + + virtual epee::byte_slice handle(const std::string& request) override final + { + throw std::logic_error{"not implemented"}; + } + }; + + struct zmq_server : public zmq_base + { + dummy_handler handler; + cryptonote::rpc::ZmqServer server; + std::shared_ptr<cryptonote::listener::zmq_pub> pub; + net::zmq::socket sub; + + zmq_server() + : zmq_base(), + handler(), + server(handler), + pub(), + sub() + { + void* ctx = server.init_rpc({}, {}); + if (!ctx) + throw std::runtime_error{"init_rpc failure"}; + + const std::string endpoint = inproc_pub; + pub = server.init_pub({std::addressof(endpoint), 1}); + if (!pub) + throw std::runtime_error{"failed to initiaze zmq/pub"}; + + sub.reset(zmq_socket(ctx, ZMQ_SUB)); + if (!sub) + MONERO_ZMQ_THROW("failed to create socket"); + if (zmq_connect(sub.get(), inproc_pub) != 0) + MONERO_ZMQ_THROW("failed to connect to dummy pub"); + + server.run(); + } + + virtual void TearDown() override final + { + EXPECT_EQ(0u, get_messages(sub.get()).size()); + sub.reset(); + pub.reset(); + server.stop(); + } + + template<std::size_t N> + void subscribe(const char (&topic)[N]) + { + if (zmq_setsockopt(sub.get(), ZMQ_SUBSCRIBE, topic, N - 1) != 0) + MONERO_ZMQ_THROW("failed to subscribe"); + } + }; +} + +TEST_F(zmq_pub, InvalidContext) +{ + EXPECT_THROW(cryptonote::listener::zmq_pub{nullptr}, std::logic_error); +} + +TEST_F(zmq_pub, NoBlocking) +{ + EXPECT_FALSE(pub->relay_to_pub(relay.get(), dummy_pub.get())); +} + +TEST_F(zmq_pub, DefaultDrop) +{ + EXPECT_EQ(0u, pub->send_txpool_add({{make_transaction(), {}, true}})); + + const cryptonote::block bl = make_block(); + EXPECT_EQ(0u,pub->send_chain_main(5, {std::addressof(bl), 1})); + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(5, {std::addressof(bl), 1})); +} + +TEST_F(zmq_pub, JsonFullTxpool) +{ + static constexpr const char topic[] = "\1json-full-txpool_add"; + + ASSERT_TRUE(sub_request(topic)); + + std::vector<cryptonote::txpool_event> events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_NO_THROW(pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); +} + +TEST_F(zmq_pub, JsonMinimalTxpool) +{ + static constexpr const char topic[] = "\1json-minimal-txpool_add"; + + ASSERT_TRUE(sub_request(topic)); + + std::vector<cryptonote::txpool_event> events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_NO_THROW(pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); +} + +TEST_F(zmq_pub, JsonFullChain) +{ + static constexpr const char topic[] = "\1json-full-chain_main"; + + ASSERT_TRUE(sub_request(topic)); + + const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}}; + + EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); +} + +TEST_F(zmq_pub, JsonMinimalChain) +{ + static constexpr const char topic[] = "\1json-minimal-chain_main"; + + ASSERT_TRUE(sub_request(topic)); + + const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}}; + + EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.front())); +} + +TEST_F(zmq_pub, JsonFullAll) +{ + static constexpr const char topic[] = "\1json-full"; + + ASSERT_TRUE(sub_request(topic)); + { + std::vector<cryptonote::txpool_event> events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + } + { + const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}}; + + EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + } +} + +TEST_F(zmq_pub, JsonMinimalAll) +{ + static constexpr const char topic[] = "\1json-minimal"; + + ASSERT_TRUE(sub_request(topic)); + + { + std::vector<cryptonote::txpool_event> events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + } + { + const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}}; + + EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.front())); + } +} + +TEST_F(zmq_pub, JsonAll) +{ + static constexpr const char topic[] = "\1json"; + + ASSERT_TRUE(sub_request(topic)); + + { + std::vector<cryptonote::txpool_event> events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back())); + + events.at(0).res = false; + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back())); + } + { + const std::array<cryptonote::block, 1> blocks{{make_block()}}; + + EXPECT_EQ(2u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.back())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.back())); + } +} + +TEST_F(zmq_pub, JsonChainWeakPtrSkip) +{ + static constexpr const char topic[] = "\1json"; + + ASSERT_TRUE(sub_request(topic)); + + const std::array<cryptonote::block, 1> blocks{{make_block()}}; + + pub.reset(); + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); +} + +TEST_F(zmq_pub, JsonTxpoolWeakPtrSkip) +{ + static constexpr const char topic[] = "\1json"; + + ASSERT_TRUE(sub_request(topic)); + + std::vector<cryptonote::txpool_event> events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + pub.reset(); + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(std::move(events))); +} + +TEST_F(zmq_server, pub) +{ + subscribe("json-minimal"); + + std::vector<cryptonote::txpool_event> events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + const std::array<cryptonote::block, 1> blocks{{make_block()}}; + + ASSERT_EQ(1u, pub->send_txpool_add(events)); + ASSERT_EQ(1u, pub->send_chain_main(200, epee::to_span(blocks))); + + auto pubs = get_published(sub.get(), 2); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_block(200, epee::to_span(blocks), pubs.back())); +} |