// Copyright (c) 2020, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "zmq_pub.h"
#include <algorithm>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/thread/locks.hpp>
#include <cassert>
#include <cstdint>
#include <cstring>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <stdexcept>
#include <string>
#include <utility>
#include "common/expect.h"
#include "crypto/crypto.h"
#include "cryptonote_basic/cryptonote_format_utils.h"
#include "cryptonote_basic/events.h"
#include "misc_log_ex.h"
#include "serialization/json_object.h"
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq"
namespace
{
constexpr const char txpool_signal[] = "tx_signal";
using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span<const cryptonote::block>);
using txpool_writer = void(epee::byte_stream&, epee::span<const cryptonote::txpool_event>);
template<typename F>
struct context
{
char const* const name;
F* generate_pub;
};
template<typename T>
bool operator<(const context<T>& lhs, const context<T>& rhs) noexcept
{
return std::strcmp(lhs.name, rhs.name) < 0;
}
template<typename T>
bool operator<(const context<T>& lhs, const boost::string_ref rhs) noexcept
{
return lhs.name < rhs;
}
struct is_valid
{
bool operator()(const cryptonote::txpool_event& event) const noexcept
{
return event.res;
}
};
template<typename T, std::size_t N>
void verify_sorted(const std::array<context<T>, N>& elems, const char* name)
{
auto unsorted = std::is_sorted_until(elems.begin(), elems.end());
if (unsorted != elems.end())
throw std::logic_error{name + std::string{" array is not properly sorted, see: "} + unsorted->name};
}
void write_header(epee::byte_stream& buf, const boost::string_ref name)
{
buf.write(name.data(), name.size());
buf.put(':');
}
//! \return `name:...` where `...` is JSON and `name` is directly copied (no quotes - not JSON).
template<typename T>
void json_pub(epee::byte_stream& buf, const T value)
{
rapidjson::Writer<epee::byte_stream> dest{buf};
using cryptonote::json::toJsonValue;
toJsonValue(dest, value);
}
//! Object for "minimal" block serialization
struct minimal_chain
{
const std::uint64_t height;
const epee::span<const cryptonote::block> blocks;
};
//! Object for "minimal" tx serialization
struct minimal_txpool
{
const cryptonote::transaction& tx;
};
void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_chain self)
{
namespace adapt = boost::adaptors;
const auto to_block_id = [](const cryptonote::block& bl)
{
crypto::hash id;
if (!get_block_hash(bl, id))
MERROR("ZMQ/Pub failure: get_block_hash");
return id;
};
assert(!self.blocks.empty()); // checked in zmq_pub::send_chain_main
dest.StartObject();
INSERT_INTO_JSON_OBJECT(dest, first_height, self.height);
INSERT_INTO_JSON_OBJECT(dest, first_prev_id, self.blocks[0].prev_id);
INSERT_INTO_JSON_OBJECT(dest, ids, (self.blocks | adapt::transformed(to_block_id)));
dest.EndObject();
}
void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_txpool self)
{
crypto::hash id{};
std::size_t blob_size = 0;
if (!get_transaction_hash(self.tx, id, blob_size))
{
MERROR("ZMQ/Pub failure: get_transaction_hash");
return;
}
dest.StartObject();
INSERT_INTO_JSON_OBJECT(dest, id, id);
INSERT_INTO_JSON_OBJECT(dest, blob_size, blob_size);
dest.EndObject();
}
void json_full_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
{
json_pub(buf, blocks);
}
void json_minimal_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
{
json_pub(buf, minimal_chain{height, blocks});
}
// boost::adaptors are in place "views" - no copy/move takes place
// moving transactions (via sort, etc.), is expensive!
void json_full_txpool(epee::byte_stream& buf, epee::span<const cryptonote::txpool_event> txes)
{
namespace adapt = boost::adaptors;
const auto to_full_tx = [](const cryptonote::txpool_event& event)
{
return event.tx;
};
json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_full_tx)));
}
void json_minimal_txpool(epee::byte_stream& buf, epee::span<const cryptonote::txpool_event> txes)
{
namespace adapt = boost::adaptors;
const auto to_minimal_tx = [](const cryptonote::txpool_event& event)
{
return minimal_txpool{event.tx};
};
json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_minimal_tx)));
}
constexpr const std::array<context<chain_writer>, 2> chain_contexts =
{{
{u8"json-full-chain_main", json_full_chain},
{u8"json-minimal-chain_main", json_minimal_chain}
}};
constexpr const std::array<context<txpool_writer>, 2> txpool_contexts =
{{
{u8"json-full-txpool_add", json_full_txpool},
{u8"json-minimal-txpool_add", json_minimal_txpool}
}};
template<typename T, std::size_t N>
epee::span<const context<T>> get_range(const std::array<context<T>, N>& contexts, const boost::string_ref value)
{
const auto not_prefix = [](const boost::string_ref lhs, const context<T>& rhs)
{
return !(boost::string_ref{rhs.name}.starts_with(lhs));
};
const auto lower = std::lower_bound(contexts.begin(), contexts.end(), value);
const auto upper = std::upper_bound(lower, contexts.end(), value, not_prefix);
return {lower, std::size_t(upper - lower)};
}
template<std::size_t N, typename T>
void add_subscriptions(std::array<std::size_t, N>& subs, const epee::span<const context<T>> range, context<T> const* const first)
{
assert(range.size() <= N);
assert((unsigned long)(range.begin() - first) <= N - range.size());
for (const auto& ctx : range)
{
const std::size_t i = std::addressof(ctx) - first;
subs[i] = std::min(std::numeric_limits<std::size_t>::max() - 1, subs[i]) + 1;
}
}
template<std::size_t N, typename T>
void remove_subscriptions(std::array<std::size_t, N>& subs, const epee::span<const context<T>> range, context<T> const* const first)
{
assert(range.size() <= N);
assert((unsigned long)(range.begin() - first) <= N - range.size());
for (const auto& ctx : range)
{
const std::size_t i = std::addressof(ctx) - first;
subs[i] = std::max(std::size_t(1), subs[i]) - 1;
}
}
template<std::size_t N, typename T, typename... U>
std::array<epee::byte_slice, N> make_pubs(const std::array<std::size_t, N>& subs, const std::array<context<T>, N>& contexts, U&&... args)
{
epee::byte_stream buf{};
std::size_t last_offset = 0;
std::array<std::size_t, N> offsets{{}};
for (std::size_t i = 0; i < N; ++i)
{
if (subs[i])
{
write_header(buf, contexts[i].name);
contexts[i].generate_pub(buf, std::forward<U>(args)...);
offsets[i] = buf.size() - last_offset;
last_offset = buf.size();
}
}
epee::byte_slice bytes{std::move(buf)};
std::array<epee::byte_slice, N> out;
for (std::size_t i = 0; i < N; ++i)
out[i] = bytes.take_slice(offsets[i]);
return out;
}
template<std::size_t N>
std::size_t send_messages(void* const socket, std::array<epee::byte_slice, N>& messages)
{
std::size_t count = 0;
for (epee::byte_slice& message : messages)
{
if (!message.empty())
{
const expect<void> sent = net::zmq::send(std::move(message), socket, ZMQ_DONTWAIT);
if (!sent)
MERROR("Failed to send ZMQ/Pub message: " << sent.error().message());
else
++count;
}
}
return count;
}
expect<bool> relay_block_pub(void* const relay, void* const pub) noexcept
{
zmq_msg_t msg;
zmq_msg_init(std::addressof(msg));
MONERO_CHECK(net::zmq::retry_op(zmq_msg_recv, std::addressof(msg), relay, ZMQ_DONTWAIT));
const boost::string_ref payload{
reinterpret_cast<const char*>(zmq_msg_data(std::addressof(msg))),
zmq_msg_size(std::addressof(msg))
};
if (payload == txpool_signal)
{
zmq_msg_close(std::addressof(msg));
return false;
}
// forward block messages (serialized on P2P thread for now)
const expect<void> sent = net::zmq::retry_op(zmq_msg_send, std::addressof(msg), pub, ZMQ_DONTWAIT);
if (!sent)
{
zmq_msg_close(std::addressof(msg));
return sent.error();
}
return true;
}
} // anonymous
namespace cryptonote { namespace listener
{
zmq_pub::zmq_pub(void* context)
: relay_(),
chain_subs_{{0}},
txpool_subs_{{0}},
sync_()
{
if (!context)
throw std::logic_error{"ZMQ context cannot be NULL"};
verify_sorted(chain_contexts, "chain_contexts");
verify_sorted(txpool_contexts, "txpool_contexts");
relay_.reset(zmq_socket(context, ZMQ_PAIR));
if (!relay_)
MONERO_ZMQ_THROW("Failed to create relay socket");
if (zmq_connect(relay_.get(), relay_endpoint()) != 0)
MONERO_ZMQ_THROW("Failed to connect relay socket");
}
zmq_pub::~zmq_pub()
{}
bool zmq_pub::sub_request(boost::string_ref message)
{
if (!message.empty())
{
const char tag = message[0];
message.remove_prefix(1);
const auto chain_range = get_range(chain_contexts, message);
const auto txpool_range = get_range(txpool_contexts, message);
if (!chain_range.empty() || !txpool_range.empty())
{
MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " <<
chain_range.size() << " chain topic(s) and " << txpool_range.size() << " txpool topic(s)");
const boost::lock_guard<boost::mutex> lock{sync_};
switch (tag)
{
case 0:
remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
return true;
case 1:
add_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
return true;
default:
break;
}
}
}
MERROR("Invalid ZMQ/Sub message");
return false;
}
bool zmq_pub::relay_to_pub(void* const relay, void* const pub)
{
const expect<bool> relayed = relay_block_pub(relay, pub);
if (!relayed)
{
MERROR("Error relaying ZMQ/Pub: " << relayed.error().message());
return false;
}
if (!*relayed)
{
std::array<std::size_t, 2> subs;
std::vector<cryptonote::txpool_event> events;
{
const boost::lock_guard<boost::mutex> lock{sync_};
if (txes_.empty())
return false;
subs = txpool_subs_;
events = std::move(txes_.front());
txes_.pop_front();
}
auto messages = make_pubs(subs, txpool_contexts, epee::to_span(events));
send_messages(pub, messages);
MDEBUG("Sent txpool ZMQ/Pub");
}
else
MDEBUG("Sent chain_main ZMQ/Pub");
return true;
}
std::size_t zmq_pub::send_chain_main(const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
{
if (blocks.empty())
return 0;
/* Block format only sends one block at a time - multiple block notifications
are less common and only occur on rollbacks. */
boost::unique_lock<boost::mutex> guard{sync_};
const auto subs_copy = chain_subs_;
guard.unlock();
for (const std::size_t sub : subs_copy)
{
if (sub)
{
/* cryptonote_core/blockchain.cpp cannot "give" us the block like core
does for txpool events. Since copying the block is expensive anyway,
serialization is done right here on the p2p thread (for now). */
auto messages = make_pubs(subs_copy, chain_contexts, height, blocks);
guard.lock();
return send_messages(relay_.get(), messages);
}
}
return 0;
}
std::size_t zmq_pub::send_txpool_add(std::vector<txpool_event> txes)
{
if (txes.empty())
return 0;
const boost::lock_guard<boost::mutex> lock{sync_};
for (const std::size_t sub : txpool_subs_)
{
if (sub)
{
const expect<void> sent = net::zmq::retry_op(zmq_send_const, relay_.get(), txpool_signal, sizeof(txpool_signal) - 1, ZMQ_DONTWAIT);
if (sent)
txes_.emplace_back(std::move(txes));
else
MERROR("ZMQ/Pub failure, relay queue error: " << sent.error().message());
return bool(sent);
}
}
return 0;
}
void zmq_pub::chain_main::operator()(const std::uint64_t height, epee::span<const cryptonote::block> blocks) const
{
const std::shared_ptr<zmq_pub> self = self_.lock();
if (self)
self->send_chain_main(height, blocks);
else
MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
}
void zmq_pub::txpool_add::operator()(std::vector<cryptonote::txpool_event> txes) const
{
const std::shared_ptr<zmq_pub> self = self_.lock();
if (self)
self->send_txpool_add(std::move(txes));
else
MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
}
}}