aboutsummaryrefslogtreecommitdiff
path: root/tests/unit_tests/zmq_rpc.cpp
diff options
context:
space:
mode:
authorLee Clagett <code@leeclagett.com>2020-03-16 23:59:26 +0000
committerLee Clagett <code@leeclagett.com>2020-05-04 02:06:35 +0000
commite5214a2ca22cecf123bcff1ab441ed0415d08a6f (patch)
tree75a052bb95f6087421c8fedde549d6930c5af847 /tests/unit_tests/zmq_rpc.cpp
parentMerge pull request #6586 (diff)
downloadmonero-e5214a2ca22cecf123bcff1ab441ed0415d08a6f.tar.xz
Adding ZMQ/Pub support for txpool_add and chain_main events
Diffstat (limited to 'tests/unit_tests/zmq_rpc.cpp')
-rw-r--r--tests/unit_tests/zmq_rpc.cpp722
1 files changed, 722 insertions, 0 deletions
diff --git a/tests/unit_tests/zmq_rpc.cpp b/tests/unit_tests/zmq_rpc.cpp
index af1f1608b..1d065fc45 100644
--- a/tests/unit_tests/zmq_rpc.cpp
+++ b/tests/unit_tests/zmq_rpc.cpp
@@ -26,11 +26,25 @@
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#include <boost/preprocessor/stringize.hpp>
#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+#include "cryptonote_basic/account.h"
+#include "cryptonote_basic/cryptonote_basic.h"
+#include "cryptonote_basic/events.h"
+#include "cryptonote_basic/cryptonote_format_utils.h"
+#include "json_serialization.h"
+#include "net/zmq.h"
#include "rpc/message.h"
+#include "rpc/zmq_pub.h"
+#include "rpc/zmq_server.h"
#include "serialization/json_object.h"
+#define MASSERT(...) \
+ if (!(__VA_ARGS__)) \
+ return testing::AssertionFailure() << BOOST_PP_STRINGIZE(__VA_ARGS__)
+
TEST(ZmqFullMessage, InvalidRequest)
{
EXPECT_THROW(
@@ -53,3 +67,711 @@ TEST(ZmqFullMessage, Request)
cryptonote::rpc::FullMessage parsed{request, true};
EXPECT_STREQ("foo", parsed.getRequestType().c_str());
}
+
+namespace
+{
+ using published_json = std::pair<std::string, rapidjson::Document>;
+
+ constexpr const char inproc_pub[] = "inproc://dummy_pub";
+
+ net::zmq::socket create_socket(void* ctx, const char* address)
+ {
+ net::zmq::socket sock{zmq_socket(ctx, ZMQ_PAIR)};
+ if (!sock)
+ MONERO_ZMQ_THROW("failed to create socket");
+ if (zmq_bind(sock.get(), address) != 0)
+ MONERO_ZMQ_THROW("socket bind failure");
+ return sock;
+ }
+
+ std::vector<std::string> get_messages(void* socket, int count = -1)
+ {
+ std::vector<std::string> out;
+ for ( ; count || count < 0; --count)
+ {
+ expect<std::string> next = net::zmq::receive(socket, (count < 0 ? ZMQ_DONTWAIT : 0));
+ if (next == net::zmq::make_error_code(EAGAIN))
+ return out;
+ out.push_back(std::move(*next));
+ }
+ return out;
+ }
+
+ std::vector<published_json> get_published(void* socket, int count = -1)
+ {
+ std::vector<published_json> out;
+
+ const auto messages = get_messages(socket, count);
+ out.reserve(messages.size());
+
+ for (const std::string& message : messages)
+ {
+ const char* split = std::strchr(message.c_str(), ':');
+ if (!split)
+ throw std::runtime_error{"Invalid ZMQ/Pub message"};
+
+ out.emplace_back();
+ out.back().first = {message.c_str(), split};
+ if (out.back().second.Parse(split + 1).HasParseError())
+ throw std::runtime_error{"Failed to parse ZMQ/Pub message"};
+ }
+
+ return out;
+ }
+
+ testing::AssertionResult compare_full_txpool(epee::span<const cryptonote::txpool_event> events, const published_json& pub)
+ {
+ MASSERT(pub.first == "json-full-txpool_add");
+ MASSERT(pub.second.IsArray());
+ MASSERT(pub.second.Size() <= events.size());
+
+ std::size_t i = 0;
+ for (const cryptonote::txpool_event& event : events)
+ {
+ MASSERT(i <= pub.second.Size());
+ if (!event.res)
+ continue;
+
+ cryptonote::transaction tx{};
+ cryptonote::json::fromJsonValue(pub.second[i], tx);
+
+ crypto::hash id{};
+ MASSERT(cryptonote::get_transaction_hash(event.tx, id));
+ MASSERT(cryptonote::get_transaction_hash(tx, id));
+ MASSERT(event.tx.hash == tx.hash);
+ ++i;
+ }
+ return testing::AssertionSuccess();
+ }
+
+ testing::AssertionResult compare_minimal_txpool(epee::span<const cryptonote::txpool_event> events, const published_json& pub)
+ {
+ MASSERT(pub.first == "json-minimal-txpool_add");
+ MASSERT(pub.second.IsArray());
+ MASSERT(pub.second.Size() <= events.size());
+
+ std::size_t i = 0;
+ for (const cryptonote::txpool_event& event : events)
+ {
+ MASSERT(i <= pub.second.Size());
+ if (!event.res)
+ continue;
+
+ std::size_t actual_size = 0;
+ crypto::hash actual_id{};
+
+ MASSERT(pub.second[i].IsObject());
+ GET_FROM_JSON_OBJECT(pub.second[i], actual_id, id);
+ GET_FROM_JSON_OBJECT(pub.second[i], actual_size, blob_size);
+
+ std::size_t expected_size = 0;
+ crypto::hash expected_id{};
+ MASSERT(cryptonote::get_transaction_hash(event.tx, expected_id, expected_size));
+ MASSERT(expected_size == actual_size);
+ MASSERT(expected_id == actual_id);
+ ++i;
+ }
+ return testing::AssertionSuccess();
+ }
+
+ testing::AssertionResult compare_full_block(const epee::span<const cryptonote::block> expected, const published_json& pub)
+ {
+ MASSERT(pub.first == "json-full-chain_main");
+ MASSERT(pub.second.IsArray());
+
+ std::vector<cryptonote::block> actual;
+ cryptonote::json::fromJsonValue(pub.second, actual);
+
+ MASSERT(expected.size() == actual.size());
+
+ for (std::size_t i = 0; i < expected.size(); ++i)
+ {
+ crypto::hash id;
+ MASSERT(cryptonote::get_block_hash(expected[i], id));
+ MASSERT(cryptonote::get_block_hash(actual[i], id));
+ MASSERT(expected[i].hash == actual[i].hash);
+ }
+
+ return testing::AssertionSuccess();
+ }
+
+ testing::AssertionResult compare_minimal_block(std::size_t height, const epee::span<const cryptonote::block> expected, const published_json& pub)
+ {
+ MASSERT(pub.first == "json-minimal-chain_main");
+ MASSERT(pub.second.IsObject());
+ MASSERT(!expected.empty());
+
+ std::size_t actual_height = 0;
+ crypto::hash actual_id{};
+ crypto::hash actual_prev_id{};
+ std::vector<crypto::hash> actual_ids{};
+ GET_FROM_JSON_OBJECT(pub.second, actual_height, first_height);
+ GET_FROM_JSON_OBJECT(pub.second, actual_prev_id, first_prev_id);
+ GET_FROM_JSON_OBJECT(pub.second, actual_ids, ids);
+
+ MASSERT(height == actual_height);
+ MASSERT(expected[0].prev_id == actual_prev_id);
+ MASSERT(expected.size() == actual_ids.size());
+
+ for (std::size_t i = 0; i < expected.size(); ++i)
+ {
+ crypto::hash id;
+ MASSERT(cryptonote::get_block_hash(expected[i], id));
+ MASSERT(id == actual_ids[i]);
+ }
+
+ return testing::AssertionSuccess();
+ }
+
+ struct zmq_base : public testing::Test
+ {
+ cryptonote::account_base acct;
+
+ zmq_base()
+ : testing::Test(), acct()
+ {
+ acct.generate();
+ }
+
+ cryptonote::transaction make_miner_transaction()
+ {
+ return test::make_miner_transaction(acct.get_keys().m_account_address);
+ }
+
+ cryptonote::transaction make_transaction(const std::vector<cryptonote::account_public_address>& destinations)
+ {
+ return test::make_transaction(acct.get_keys(), {make_miner_transaction()}, destinations, true, true);
+ }
+
+ cryptonote::transaction make_transaction()
+ {
+ cryptonote::account_base temp_account;
+ temp_account.generate();
+ return make_transaction({temp_account.get_keys().m_account_address});
+ }
+
+ cryptonote::block make_block()
+ {
+ cryptonote::block block{};
+ block.major_version = 1;
+ block.minor_version = 3;
+ block.timestamp = 100;
+ block.prev_id = crypto::rand<crypto::hash>();
+ block.nonce = 100;
+ block.miner_tx = make_miner_transaction();
+ return block;
+ }
+ };
+
+ struct zmq_pub : public zmq_base
+ {
+ net::zmq::context ctx;
+ net::zmq::socket relay;
+ net::zmq::socket dummy_pub;
+ net::zmq::socket dummy_client;
+ std::shared_ptr<cryptonote::listener::zmq_pub> pub;
+
+ zmq_pub()
+ : zmq_base(),
+ ctx(zmq_init(1)),
+ relay(create_socket(ctx.get(), cryptonote::listener::zmq_pub::relay_endpoint())),
+ dummy_pub(create_socket(ctx.get(), inproc_pub)),
+ dummy_client(zmq_socket(ctx.get(), ZMQ_PAIR)),
+ pub(std::make_shared<cryptonote::listener::zmq_pub>(ctx.get()))
+ {
+ if (!dummy_client)
+ MONERO_ZMQ_THROW("failed to create socket");
+ if (zmq_connect(dummy_client.get(), inproc_pub) != 0)
+ MONERO_ZMQ_THROW("failed to connect to dummy pub");
+ }
+
+ virtual void TearDown() override final
+ {
+ EXPECT_EQ(0u, get_messages(relay.get()).size());
+ EXPECT_EQ(0u, get_messages(dummy_client.get()).size());
+ }
+
+ template<std::size_t N>
+ bool sub_request(const char (&topic)[N])
+ {
+ return pub->sub_request({topic, N - 1});
+ }
+ };
+
+ struct dummy_handler final : cryptonote::rpc::RpcHandler
+ {
+ dummy_handler()
+ : cryptonote::rpc::RpcHandler()
+ {}
+
+ virtual epee::byte_slice handle(const std::string& request) override final
+ {
+ throw std::logic_error{"not implemented"};
+ }
+ };
+
+ struct zmq_server : public zmq_base
+ {
+ dummy_handler handler;
+ cryptonote::rpc::ZmqServer server;
+ std::shared_ptr<cryptonote::listener::zmq_pub> pub;
+ net::zmq::socket sub;
+
+ zmq_server()
+ : zmq_base(),
+ handler(),
+ server(handler),
+ pub(),
+ sub()
+ {
+ void* ctx = server.init_rpc({}, {});
+ if (!ctx)
+ throw std::runtime_error{"init_rpc failure"};
+
+ const std::string endpoint = inproc_pub;
+ pub = server.init_pub({std::addressof(endpoint), 1});
+ if (!pub)
+ throw std::runtime_error{"failed to initiaze zmq/pub"};
+
+ sub.reset(zmq_socket(ctx, ZMQ_SUB));
+ if (!sub)
+ MONERO_ZMQ_THROW("failed to create socket");
+ if (zmq_connect(sub.get(), inproc_pub) != 0)
+ MONERO_ZMQ_THROW("failed to connect to dummy pub");
+
+ server.run();
+ }
+
+ virtual void TearDown() override final
+ {
+ EXPECT_EQ(0u, get_messages(sub.get()).size());
+ sub.reset();
+ pub.reset();
+ server.stop();
+ }
+
+ template<std::size_t N>
+ void subscribe(const char (&topic)[N])
+ {
+ if (zmq_setsockopt(sub.get(), ZMQ_SUBSCRIBE, topic, N - 1) != 0)
+ MONERO_ZMQ_THROW("failed to subscribe");
+ }
+ };
+}
+
+TEST_F(zmq_pub, InvalidContext)
+{
+ EXPECT_THROW(cryptonote::listener::zmq_pub{nullptr}, std::logic_error);
+}
+
+TEST_F(zmq_pub, NoBlocking)
+{
+ EXPECT_FALSE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+}
+
+TEST_F(zmq_pub, DefaultDrop)
+{
+ EXPECT_EQ(0u, pub->send_txpool_add({{make_transaction(), {}, true}}));
+
+ const cryptonote::block bl = make_block();
+ EXPECT_EQ(0u,pub->send_chain_main(5, {std::addressof(bl), 1}));
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(5, {std::addressof(bl), 1}));
+}
+
+TEST_F(zmq_pub, JsonFullTxpool)
+{
+ static constexpr const char topic[] = "\1json-full-txpool_add";
+
+ ASSERT_TRUE(sub_request(topic));
+
+ std::vector<cryptonote::txpool_event> events
+ {
+ {make_transaction(), {}, true}, {make_transaction(), {}, true}
+ };
+
+ EXPECT_NO_THROW(pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+
+ events.at(0).res = false;
+ EXPECT_EQ(1u, pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+
+ events.at(0).res = false;
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+}
+
+TEST_F(zmq_pub, JsonMinimalTxpool)
+{
+ static constexpr const char topic[] = "\1json-minimal-txpool_add";
+
+ ASSERT_TRUE(sub_request(topic));
+
+ std::vector<cryptonote::txpool_event> events
+ {
+ {make_transaction(), {}, true}, {make_transaction(), {}, true}
+ };
+
+ EXPECT_NO_THROW(pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+
+ events.at(0).res = false;
+ EXPECT_EQ(1u, pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+
+ events.at(0).res = false;
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+}
+
+TEST_F(zmq_pub, JsonFullChain)
+{
+ static constexpr const char topic[] = "\1json-full-chain_main";
+
+ ASSERT_TRUE(sub_request(topic));
+
+ const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}};
+
+ EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
+}
+
+TEST_F(zmq_pub, JsonMinimalChain)
+{
+ static constexpr const char topic[] = "\1json-minimal-chain_main";
+
+ ASSERT_TRUE(sub_request(topic));
+
+ const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}};
+
+ EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.front()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.front()));
+}
+
+TEST_F(zmq_pub, JsonFullAll)
+{
+ static constexpr const char topic[] = "\1json-full";
+
+ ASSERT_TRUE(sub_request(topic));
+ {
+ std::vector<cryptonote::txpool_event> events
+ {
+ {make_transaction(), {}, true}, {make_transaction(), {}, true}
+ };
+
+ EXPECT_EQ(1u, pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+
+ events.at(0).res = false;
+ EXPECT_NO_THROW(pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+
+ events.at(0).res = false;
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+ }
+ {
+ const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}};
+
+ EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
+ }
+}
+
+TEST_F(zmq_pub, JsonMinimalAll)
+{
+ static constexpr const char topic[] = "\1json-minimal";
+
+ ASSERT_TRUE(sub_request(topic));
+
+ {
+ std::vector<cryptonote::txpool_event> events
+ {
+ {make_transaction(), {}, true}, {make_transaction(), {}, true}
+ };
+
+ EXPECT_EQ(1u, pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+
+ events.at(0).res = false;
+ EXPECT_NO_THROW(pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+
+ events.at(0).res = false;
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+ }
+ {
+ const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}};
+
+ EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.front()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(1u, pubs.size());
+ ASSERT_LE(1u, pubs.size());
+ EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.front()));
+ }
+}
+
+TEST_F(zmq_pub, JsonAll)
+{
+ static constexpr const char topic[] = "\1json";
+
+ ASSERT_TRUE(sub_request(topic));
+
+ {
+ std::vector<cryptonote::txpool_event> events
+ {
+ {make_transaction(), {}, true}, {make_transaction(), {}, true}
+ };
+
+ EXPECT_EQ(1u, pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(2u, pubs.size());
+ ASSERT_LE(2u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(2u, pubs.size());
+ ASSERT_LE(2u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back()));
+
+ events.at(0).res = false;
+ EXPECT_EQ(1u, pub->send_txpool_add(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(2u, pubs.size());
+ ASSERT_LE(2u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back()));
+
+ events.at(0).res = false;
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(2u, pubs.size());
+ ASSERT_LE(2u, pubs.size());
+ EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back()));
+ }
+ {
+ const std::array<cryptonote::block, 1> blocks{{make_block()}};
+
+ EXPECT_EQ(2u, pub->send_chain_main(100, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ auto pubs = get_published(dummy_client.get());
+ EXPECT_EQ(2u, pubs.size());
+ ASSERT_LE(2u, pubs.size());
+ EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
+ EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.back()));
+
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+ EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
+
+ pubs = get_published(dummy_client.get());
+ EXPECT_EQ(2u, pubs.size());
+ ASSERT_LE(2u, pubs.size());
+ EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
+ EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.back()));
+ }
+}
+
+TEST_F(zmq_pub, JsonChainWeakPtrSkip)
+{
+ static constexpr const char topic[] = "\1json";
+
+ ASSERT_TRUE(sub_request(topic));
+
+ const std::array<cryptonote::block, 1> blocks{{make_block()}};
+
+ pub.reset();
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
+}
+
+TEST_F(zmq_pub, JsonTxpoolWeakPtrSkip)
+{
+ static constexpr const char topic[] = "\1json";
+
+ ASSERT_TRUE(sub_request(topic));
+
+ std::vector<cryptonote::txpool_event> events
+ {
+ {make_transaction(), {}, true}, {make_transaction(), {}, true}
+ };
+
+ pub.reset();
+ EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(std::move(events)));
+}
+
+TEST_F(zmq_server, pub)
+{
+ subscribe("json-minimal");
+
+ std::vector<cryptonote::txpool_event> events
+ {
+ {make_transaction(), {}, true}, {make_transaction(), {}, true}
+ };
+
+ const std::array<cryptonote::block, 1> blocks{{make_block()}};
+
+ ASSERT_EQ(1u, pub->send_txpool_add(events));
+ ASSERT_EQ(1u, pub->send_chain_main(200, epee::to_span(blocks)));
+
+ auto pubs = get_published(sub.get(), 2);
+ EXPECT_EQ(2u, pubs.size());
+ ASSERT_LE(2u, pubs.size());
+ EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
+ EXPECT_TRUE(compare_minimal_block(200, epee::to_span(blocks), pubs.back()));
+}