aboutsummaryrefslogblamecommitdiff
path: root/src/rpc/zmq_server.cpp
blob: 1ee55673eca592380d758d6bc7f0d36a0cc6dbab (plain) (tree)
1
                                              


























                                                                                          
 


                       

                    





                                                                                                 



                                     
                                      
 
                                                         


                       


                       
     
   

                                                          
     
                                                    
     
             
     




                                                                                         
     











                                                     

   
                                                                                           



                                                         
                                                                               
 
               
   

                                              
 




                                                                
 



                                                                                                                        
 



                                                                                                            
   










                                                           
   
                                                       





                     



                                                                   
                             
 
                                                             
                    




                          
// Copyright (c) 2016-2019, The Monero Project
// 
// All rights reserved.
// 
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
// 
// 1. Redistributions of source code must retain the above copyright notice, this list of
//    conditions and the following disclaimer.
// 
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
//    of conditions and the following disclaimer in the documentation and/or other
//    materials provided with the distribution.
// 
// 3. Neither the name of the copyright holder nor the names of its contributors may be
//    used to endorse or promote products derived from this software without specific
//    prior written permission.
// 
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include "zmq_server.h"

#include <chrono>
#include <cstdint>
#include <system_error>

namespace cryptonote
{

namespace
{
  constexpr const int num_zmq_threads = 1;
  constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
  constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
}

namespace rpc
{

ZmqServer::ZmqServer(RpcHandler& h) :
    handler(h),
    context(zmq_init(num_zmq_threads))
{
    if (!context)
        MONERO_ZMQ_THROW("Unable to create ZMQ context");
}

ZmqServer::~ZmqServer()
{
}

void ZmqServer::serve()
{
  try
  {
    // socket must close before `zmq_term` will exit.
    const net::zmq::socket socket = std::move(rep_socket);
    if (!socket)
    {
      MERROR("ZMQ RPC server reply socket is null");
      return;
    }

    while (1)
    {
      const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
      MDEBUG("Received RPC request: \"" << message << "\"");
      const std::string& response = handler.handle(message);

      MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
      MDEBUG("Sent RPC reply: \"" << response << "\"");
    }
  }
  catch (const std::system_error& e)
  {
    if (e.code() != net::zmq::make_error_code(ETERM))
      MERROR("ZMQ RPC Server Error: " << e.what());
  }
  catch (const std::exception& e)
  {
    MERROR("ZMQ RPC Server Error: " << e.what());
  }
  catch (...)
  {
    MERROR("Unknown error in ZMQ RPC server");
  }
}

bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
{
  MERROR("ZmqServer::addIPCSocket not yet implemented!");
  return false;
}

bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
{
  if (!context)
  {
    MERROR("ZMQ RPC Server already shutdown");
    return false;
  }

  rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
  if (!rep_socket)
  {
    MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
    return false;
  }

  if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
  {
    MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
    return false;
  }

  static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
  if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
  {
    MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
    return false;
  }

  if (address.empty())
    address = "*";
  if (port.empty())
    port = "*";

  std::string bind_address = "tcp://";
  bind_address.append(address.data(), address.size());
  bind_address += ":";
  bind_address.append(port.data(), port.size());

  if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
  {
    MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
    return false;
  }
  return true;
}

void ZmqServer::run()
{
  run_thread = boost::thread(boost::bind(&ZmqServer::serve, this));
}

void ZmqServer::stop()
{
  if (!run_thread.joinable())
    return;

  context.reset(); // destroying context terminates all calls
  run_thread.join();
}


}  // namespace cryptonote

}  // namespace rpc