Skip to content

Commit

Permalink
use grpc_server and grpc_client
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-christophe81 committed May 23, 2024
1 parent a04fa54 commit e8cf6bd
Show file tree
Hide file tree
Showing 18 changed files with 497 additions and 185 deletions.
6 changes: 5 additions & 1 deletion broker/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ set(MODULE_DIR "${PROJECT_SOURCE_DIR}/grpc")
set(INC_DIR "${MODULE_DIR}/inc/com/centreon/broker/grpc")
set(SRC_DIR "${MODULE_DIR}/src")
set(TEST_DIR "${MODULE_DIR}/test")
include_directories(${MODULE_DIR}/inc ${SRC_DIR} ${CMAKE_SOURCE_DIR}/bbdo)
include_directories(${MODULE_DIR}/inc
${SRC_DIR}
${CMAKE_SOURCE_DIR}/bbdo
${CMAKE_SOURCE_DIR}/common/grpc/inc)

# Sources.
set(SOURCES
Expand All @@ -43,6 +46,7 @@ add_library(${GRPC} SHARED ${SOURCES} )
set_target_properties(${GRPC} PROPERTIES PREFIX "")
target_link_libraries(
${GRPC}
centreon_grpc
"-Wl,--whole-archive"
pb_neb_lib
pb_storage_lib
Expand Down
5 changes: 3 additions & 2 deletions broker/grpc/inc/com/centreon/broker/grpc/acceptor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define CCB_GRPC_ACCEPTOR_HH

#include "com/centreon/broker/io/endpoint.hh"
#include "com/centreon/common/grpc/grpc_server.hh"
#include "grpc_config.hh"

namespace com::centreon::broker::grpc {
Expand Down Expand Up @@ -60,9 +61,9 @@ class service_impl
void unregister(const std::shared_ptr<io::stream>& to_unregister);
};

class acceptor : public io::endpoint {
class acceptor : public io::endpoint,
public com::centreon::common::grpc::grpc_server_base {
std::shared_ptr<service_impl> _service;
std::unique_ptr<::grpc::Server> _server;

public:
acceptor(const grpc_config::pointer& conf);
Expand Down
6 changes: 3 additions & 3 deletions broker/grpc/inc/com/centreon/broker/grpc/connector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
#define CCB_GRPC_CONNECTOR_HH

#include "com/centreon/broker/io/limit_endpoint.hh"
#include "com/centreon/common/grpc/grpc_client.hh"
#include "grpc_config.hh"

namespace com::centreon::broker {

namespace grpc {
class connector : public io::limit_endpoint {
grpc_config::pointer _conf;
std::shared_ptr<::grpc::Channel> _channel;
class connector : public io::limit_endpoint,
public com::centreon::common::grpc::grpc_client_base {
std::unique_ptr<com::centreon::broker::stream::centreon_bbdo::Stub> _stub;

public:
Expand Down
58 changes: 17 additions & 41 deletions broker/grpc/inc/com/centreon/broker/grpc/grpc_config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@
#ifndef CCB_GRPC_CONFIG_HH
#define CCB_GRPC_CONFIG_HH

namespace com::centreon::broker {
#include "com/centreon/common/grpc/grpc_config.hh"

namespace grpc {
class grpc_config {
public:
enum compression_active { NO = 0, AUTO = 1, YES = 2 };
namespace com::centreon::broker::grpc {

private:
const std::string _hostport;
const bool _crypted = false;
const std::string _certificate, _cert_key, _ca_cert, _authorization;
const std::string _ca_name;
const compression_active _compress;
const int _second_keepalive_interval;
class grpc_config : public com::centreon::common::grpc::grpc_config {
const std::string _authorization;
/**
* @brief when _grpc_serialized is set, bbdo events are sent on the wire
* without bbdo stream serialization
Expand All @@ -43,14 +35,9 @@ class grpc_config {
public:
using pointer = std::shared_ptr<grpc_config>;

grpc_config()
: _compress(NO),
_second_keepalive_interval(30),
_grpc_serialized(false) {}
grpc_config() : _grpc_serialized(false) {}
grpc_config(const std::string& hostp)
: _hostport(hostp),
_compress(NO),
_second_keepalive_interval(30),
: com::centreon::common::grpc::grpc_config(hostp),
_grpc_serialized(false) {}
grpc_config(const std::string& hostp,
bool crypted,
Expand All @@ -59,37 +46,26 @@ class grpc_config {
const std::string& ca_cert,
const std::string& authorization,
const std::string& ca_name,
compression_active compression,
bool compression,
int second_keepalive_interval,
bool grpc_serialized)
: _hostport(hostp),
_crypted(crypted),
_certificate(certificate),
_cert_key(cert_key),
_ca_cert(ca_cert),
: com::centreon::common::grpc::grpc_config(hostp,
crypted,
certificate,
cert_key,
ca_cert,
ca_name,
compression,
second_keepalive_interval),
_authorization(authorization),
_ca_name(ca_name),
_compress(compression),
_second_keepalive_interval(second_keepalive_interval),
_grpc_serialized(grpc_serialized) {}

constexpr const std::string& get_hostport() const { return _hostport; }
constexpr bool is_crypted() const { return _crypted; }
constexpr const std::string& get_cert() const { return _certificate; }
constexpr const std::string& get_key() const { return _cert_key; }
constexpr const std::string& get_ca() const { return _ca_cert; }
constexpr const std::string& get_authorization() const {
return _authorization;
}
const std::string& get_ca_name() const { return _ca_name; }
constexpr compression_active get_compression() const { return _compress; }
constexpr bool get_grpc_serialized() const { return _grpc_serialized; }

int get_second_keepalive_interval() const {
return _second_keepalive_interval;
}
};
}; // namespace grpc
}

} // namespace com::centreon::broker::grpc

#endif // !CCB_GRPC_CONFIG_HH
62 changes: 8 additions & 54 deletions broker/grpc/src/acceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,71 +234,25 @@ void service_impl::unregister(
* @param conf
*/
acceptor::acceptor(const grpc_config::pointer& conf)
: io::endpoint(true, {}), _service(std::make_shared<service_impl>(conf)) {
::grpc::ServerBuilder builder;

std::shared_ptr<::grpc::ServerCredentials> server_creds;
if (conf->is_crypted() && !conf->get_cert().empty() &&
!conf->get_key().empty()) {
::grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {
conf->get_key(), conf->get_cert()};

SPDLOG_LOGGER_INFO(
log_v2::grpc(),
"encrypted server listening on {} cert: {}..., key: {}..., ca: {}....",
conf->get_hostport(), conf->get_cert().substr(0, 10),
conf->get_key().substr(0, 10), conf->get_ca().substr(0, 10));

::grpc::SslServerCredentialsOptions ssl_opts;
ssl_opts.pem_root_certs = conf->get_ca();
ssl_opts.pem_key_cert_pairs.push_back(pkcp);

server_creds = ::grpc::SslServerCredentials(ssl_opts);
} else {
SPDLOG_LOGGER_INFO(log_v2::grpc(), "unencrypted server listening on {}",
conf->get_hostport());
server_creds = ::grpc::InsecureServerCredentials();
}
builder.AddListeningPort(conf->get_hostport(), server_creds);
builder.RegisterService(_service.get());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS,
conf->get_second_keepalive_interval() * 1000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
conf->get_second_keepalive_interval() * 300);
builder.AddChannelArgument(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
builder.AddChannelArgument(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
builder.AddChannelArgument(
GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 60000);

if (conf->get_compression() == grpc_config::YES) {
grpc_compression_algorithm algo = grpc_compression_algorithm_for_level(
GRPC_COMPRESS_LEVEL_HIGH, calc_accept_all_compression_mask());
const char* algo_name;
if (grpc_compression_algorithm_name(algo, &algo_name))
SPDLOG_LOGGER_DEBUG(log_v2::grpc(), "server default compression {}",
algo_name);
else
SPDLOG_LOGGER_DEBUG(log_v2::grpc(), "server default compression unknown");

builder.SetDefaultCompressionAlgorithm(algo);
builder.SetDefaultCompressionLevel(GRPC_COMPRESS_LEVEL_HIGH);
}
_server = std::move(builder.BuildAndStart());
: io::endpoint(true, {}),
com::centreon::common::grpc::grpc_server_base(conf, log_v2::grpc()) {
_init([this](::grpc::ServerBuilder& builder) {
_service = std::make_shared<service_impl>(
std::static_pointer_cast<grpc_config>(get_conf()));
builder.RegisterService(_service.get());
});
}

/**
* @brief Destroy the acceptor::acceptor object
*
*/
acceptor::~acceptor() {
if (_server) {
if (initialized()) {
SPDLOG_LOGGER_DEBUG(log_v2::grpc(), "begin shutdown of acceptor {} ",
_service->get_conf()->get_hostport());
_service->shutdown_all_wait();
_service->shutdown_all_accepted();
_server->Shutdown(std::chrono::system_clock::now() +
std::chrono::seconds(15));
SPDLOG_LOGGER_DEBUG(log_v2::grpc(), "end shutdown of acceptor {} ",
_service->get_conf()->get_hostport());
}
Expand Down
59 changes: 7 additions & 52 deletions broker/grpc/src/connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,54 +33,8 @@ using namespace com::centreon::broker::grpc;
* @param port The port used for the connection.
*/
connector::connector(const grpc_config::pointer& conf)
: io::limit_endpoint(false, {}), _conf(conf) {
::grpc::ChannelArguments args;
args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,
conf->get_second_keepalive_interval() * 1000);
args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
conf->get_second_keepalive_interval() * 300);
args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
if (!conf->get_ca_name().empty())
args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, conf->get_ca_name());
if (conf->get_compression() == grpc_config::YES) {
grpc_compression_algorithm algo = grpc_compression_algorithm_for_level(
GRPC_COMPRESS_LEVEL_HIGH, calc_accept_all_compression_mask());

const char* algo_name;
if (grpc_compression_algorithm_name(algo, &algo_name)) {
log_v2::grpc()->debug("client this={:p} activate compression {}",
static_cast<void*>(this), algo_name);
} else {
log_v2::grpc()->debug("client this={:p} activate compression unknown",
static_cast<void*>(this));
}
args.SetCompressionAlgorithm(algo);
}
std::shared_ptr<::grpc::ChannelCredentials> creds;
if (conf->is_crypted()) {
::grpc::SslCredentialsOptions ssl_opts = {conf->get_ca(), conf->get_key(),
conf->get_cert()};
SPDLOG_LOGGER_INFO(
log_v2::grpc(),
"encrypted connection to {} cert: {}..., key: {}..., ca: {}...",
conf->get_hostport(), conf->get_cert().substr(0, 10),
conf->get_key().substr(0, 10), conf->get_ca().substr(0, 10));
creds = ::grpc::SslCredentials(ssl_opts);
#ifdef CAN_USE_JWT
if (!_conf->get_jwt().empty()) {
std::shared_ptr<::grpc::CallCredentials> jwt =
::grpc::ServiceAccountJWTAccessCredentials(_conf->get_jwt(), 86400);
creds = ::grpc::CompositeChannelCredentials(creds, jwt);
}
#endif
} else {
SPDLOG_LOGGER_INFO(log_v2::grpc(), "unencrypted connection to {}",
conf->get_hostport());
creds = ::grpc::InsecureChannelCredentials();
}

_channel = ::grpc::CreateCustomChannel(conf->get_hostport(), creds, args);
: io::limit_endpoint(false, {}),
com::centreon::common::grpc::grpc_client_base(conf, log_v2::grpc()) {
_stub = std::move(
com::centreon::broker::stream::centreon_bbdo::NewStub(_channel));
}
Expand All @@ -91,14 +45,15 @@ connector::connector(const grpc_config::pointer& conf)
* @return std::unique_ptr<io::stream>
*/
std::shared_ptr<io::stream> connector::open() {
SPDLOG_LOGGER_INFO(log_v2::grpc(), "Connecting to {}", _conf->get_hostport());
SPDLOG_LOGGER_INFO(log_v2::grpc(), "Connecting to {}",
get_conf()->get_hostport());
try {
return limit_endpoint::open();
} catch (const std::exception& e) {
SPDLOG_LOGGER_DEBUG(
log_v2::tcp(),
"Unable to establish the connection to {} (attempt {}): {}",
_conf->get_hostport(), _is_ready_count, e.what());
get_conf()->get_hostport(), _is_ready_count, e.what());
return nullptr;
}
}
Expand Down Expand Up @@ -155,8 +110,8 @@ void client_stream::shutdown() {
* @return std::unique_ptr<io::stream>
*/
std::shared_ptr<io::stream> connector::create_stream() {
std::shared_ptr<client_stream> new_stream =
std::make_shared<client_stream>(_conf);
std::shared_ptr<client_stream> new_stream = std::make_shared<client_stream>(
std::static_pointer_cast<grpc_config>(get_conf()));
client_stream::register_stream(new_stream);
_stub->async()->exchange(&new_stream->get_context(), new_stream.get());
new_stream->start_read();
Expand Down
16 changes: 2 additions & 14 deletions broker/grpc/src/factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,6 @@ io::endpoint* factory::new_endpoint(
if (it != cfg.params.end() && !strcasecmp(it->second.c_str(), "yes"))
compression = true;

grpc_config::compression_active enable_compression;
if (cfg.get_io_type() == config::endpoint::output)
enable_compression = compression ? grpc_config::YES : grpc_config::NO;
else
enable_compression = grpc_config::AUTO;

// keepalive conf
int keepalive_interval = 30;
it = cfg.params.find("keepalive_interval");
Expand All @@ -232,7 +226,7 @@ io::endpoint* factory::new_endpoint(

grpc_config::pointer conf(std::make_shared<grpc_config>(
hostport, encrypted, certificate, certificate_key, certificate_authority,
authorization, ca_name, enable_compression, keepalive_interval,
authorization, ca_name, compression, keepalive_interval,
direct_grpc_serialized(cfg)));

std::unique_ptr<io::endpoint> endp;
Expand Down Expand Up @@ -411,12 +405,6 @@ io::endpoint* factory::_new_endpoint_bbdo_cs(
it->second);
}

grpc_config::compression_active enable_compression;
if (cfg.get_io_type() == config::endpoint::output)
enable_compression = compression ? grpc_config::YES : grpc_config::NO;
else
enable_compression = grpc_config::AUTO;

bool enable_retention = false;
it = cfg.params.find("retention");
if (it != cfg.params.end()) {
Expand Down Expand Up @@ -450,7 +438,7 @@ io::endpoint* factory::_new_endpoint_bbdo_cs(

grpc_config::pointer conf(std::make_shared<grpc_config>(
hostport, encryption, certificate, private_key, ca_certificate,
authorization, ca_name, enable_compression, keepalive_interval,
authorization, ca_name, compression, keepalive_interval,
direct_grpc_serialized(cfg)));

// Acceptor.
Expand Down
8 changes: 4 additions & 4 deletions broker/grpc/test/acceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TEST_F(GrpcTlsTest, TlsStream) {
auto conf{std::make_shared<grpc_config>(
"0.0.0.0:4141", true, read_file("/tmp/server.crt"),
read_file("/tmp/server.key"), read_file("/tmp/client.crt"), "",
"centreon", grpc_config::NO, 30, false)};
"centreon", false, 30, false)};
auto a{std::make_unique<acceptor>(conf)};

/* Nominal case, cbd is acceptor and read on the socket */
Expand Down Expand Up @@ -104,7 +104,7 @@ TEST_F(GrpcTlsTest, TlsStream) {
auto conf{std::make_shared<grpc_config>(
fmt::format("{}:4141", hostname), true, read_file("/tmp/client.crt"),
read_file("/tmp/client.key"), read_file("/tmp/server.crt"), "", "",
grpc_config::NO, 30, false)};
false, 30, false)};
auto c{std::make_unique<connector>(conf)};

/* Nominal case, centengine is connector and write on the socket */
Expand Down Expand Up @@ -157,7 +157,7 @@ TEST_F(GrpcTlsTest, TlsStreamBadCaHostname) {
auto conf{std::make_shared<grpc_config>(
"0.0.0.0:4141", true, read_file("/tmp/server.crt"),
read_file("/tmp/server.key"), read_file("/tmp/client.crt"), "",
"centreon", grpc_config::NO, 30, false)};
"centreon", false, 30, false)};
auto a{std::make_unique<acceptor>(conf)};

/* Nominal case, cbd is acceptor and read on the socket */
Expand All @@ -172,7 +172,7 @@ TEST_F(GrpcTlsTest, TlsStreamBadCaHostname) {
auto conf{std::make_shared<grpc_config>(
"localhost:4141", true, read_file("/tmp/client.crt"),
read_file("/tmp/client.key"), read_file("/tmp/server.crt"), "",
"bad_name", grpc_config::NO, 30, false)};
"bad_name", false, 30, false)};
auto c{std::make_unique<connector>(conf)};

/* Nominal case, centengine is connector and write on the socket */
Expand Down
Loading

0 comments on commit e8cf6bd

Please sign in to comment.